This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8de3a2fba9f [improvement](fe) Add configurable return mode for insert
publish timeout in ETL scenarios (#63919)
8de3a2fba9f is described below
commit 8de3a2fba9fa75c5d403912958be5aed6cc7e949
Author: Wen Zhenghu <[email protected]>
AuthorDate: Fri Jun 5 15:06:35 2026 +0800
[improvement](fe) Add configurable return mode for insert publish timeout
in ETL scenarios (#63919)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Normal internal-table inserts currently treat publish timeout as a
committed insert and return success with COMMITTED status. This behavior
is acceptable when clients only care that the transaction has been
committed and can tolerate delayed visibility, but it is unsafe for
pipelines whose downstream steps depend on the inserted data already
being visible.
A typical case is ETL workflows that first use CREATE TABLE AS SELECT to
build a temporary table and then immediately read that table to populate
a downstream result table. If the upstream transaction has been
committed but is not yet VISIBLE, the downstream step may temporarily
read no rows and silently write empty data into the final table, so the
whole pipeline appears successful even though the result is incorrect.
Doris already returns an error in explicit transaction mode when a
COMMIT statement times out before the transaction becomes visible. This
change adds a compatible mode for the regular non-transactional
internal-table insert path by introducing a session variable,
insert_visible_timeout_return_mode, so users can choose whether publish
timeout should keep returning COMMITTED or return ERR.
The implementation also keeps committed-side bookkeeping unchanged in
error mode so finished load jobs, insert result metadata, and related
accounting still reflect the real transaction state.
### Release note
Add a session variable to control whether normal internal-table inserts
return COMMITTED or ERR when publish visibility times out.
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [x] Regression test
- [x] Unit Test
- [x] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [x] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
.../plans/commands/insert/OlapInsertExecutor.java | 16 ++
.../java/org/apache/doris/qe/SessionVariable.java | 71 ++++++
.../commands/insert/OlapInsertExecutorTest.java | 274 +++++++++++++++++++++
.../org/apache/doris/qe/SessionVariablesTest.java | 88 ++++++-
.../test_insert_visible_timeout_return_mode.out | 5 +
.../test_insert_visible_timeout_return_mode.groovy | 83 +++++++
6 files changed, 535 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index a1e22fd2902..c4ae68acb98 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -46,6 +46,7 @@ import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
@@ -78,7 +79,12 @@ import java.util.stream.Collectors;
*/
public class OlapInsertExecutor extends AbstractInsertExecutor {
private static final Logger LOG =
LogManager.getLogger(OlapInsertExecutor.class);
+ // Keep the timeout message aligned with the client-facing error returned
by the legacy insert path.
+ private static final String INSERT_VISIBLE_TIMEOUT_ERROR_MSG =
"transaction commit successfully, "
+ + "BUT data did not become visible within
insert_visible_timeout_ms and will be visible later.";
protected TransactionStatus txnStatus = TransactionStatus.ABORTED;
+ // Track publish timeout separately from real failures so committed
bookkeeping still runs.
+ protected boolean publishTimedOutAfterCommit = false;
protected OlapTable olapTable;
@@ -236,7 +242,9 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
ctx.getSessionVariable().getInsertVisibleTimeoutMs(),
txnCommitAttachment)) {
txnStatus = TransactionStatus.VISIBLE;
} else {
+ // Keep the committed status so load accounting and insert result
bookkeeping stay aligned.
txnStatus = TransactionStatus.COMMITTED;
+ publishTimedOutAfterCommit = true;
}
if (Config.isCloudMode()) {
String clusterName = ctx.getCloudCluster();
@@ -370,6 +378,14 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
txnStatus, loadedRows, filteredRows);
// update it, so that user can get loaded rows in fe.audit.log
ctx.updateReturnRows((int) loadedRows);
+ if (publishTimedOutAfterCommit &&
ctx.getSessionVariable().isInsertVisibleTimeoutReturnError()) {
+ // Log the committed timeout branch explicitly so operators can
distinguish it from real failures.
+ LOG.warn("insert [{}] with txn id {} committed but return error
because {}={}",
+ labelName, txnId,
SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE,
+ SessionVariable.InsertVisibleTimeoutReturnMode.ERROR);
+ // Convert the final client response to ERR after all
committed-side bookkeeping has finished.
+ ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
INSERT_VISIBLE_TIMEOUT_ERROR_MSG);
+ }
}
public long getTimeout() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 2902615afef..3f6da2fed73 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -269,6 +269,30 @@ public class SessionVariable implements Serializable,
Writable {
// max ms to wait transaction publish finish when exec insert stmt.
public static final String INSERT_VISIBLE_TIMEOUT_MS =
"insert_visible_timeout_ms";
+ public static final String INSERT_VISIBLE_TIMEOUT_RETURN_MODE =
"insert_visible_timeout_return_mode";
+ public static final String INSERT_VISIBLE_TIMEOUT_RETURN_MODE_COMMITTED =
"committed";
+ public static final String INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR =
"error";
+
+ // Keep the mode enum for business logic while storing the session value
as a string.
+ public enum InsertVisibleTimeoutReturnMode {
+ COMMITTED(INSERT_VISIBLE_TIMEOUT_RETURN_MODE_COMMITTED),
+ ERROR(INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR);
+
+ private final String option;
+
+ InsertVisibleTimeoutReturnMode(String option) {
+ this.option = option;
+ }
+
+ public String getOption() {
+ return option;
+ }
+
+ @Override
+ public String toString() {
+ return option;
+ }
+ }
public static final String DELETE_WITHOUT_PARTITION =
"delete_without_partition";
@@ -1100,6 +1124,15 @@ public class SessionVariable implements Serializable,
Writable {
@VarAttrDef.VarAttr(name = INSERT_VISIBLE_TIMEOUT_MS, needForward = true)
public long insertVisibleTimeoutMs = DEFAULT_INSERT_VISIBLE_TIMEOUT_MS;
+ // Control whether publish timeout keeps the committed response or returns
an explicit error.
+ @VarAttrDef.VarAttr(name = INSERT_VISIBLE_TIMEOUT_RETURN_MODE, needForward
= true,
+ checker = "checkInsertVisibleTimeoutReturnMode", setter =
"setInsertVisibleTimeoutReturnMode",
+ description = {"控制普通内表 INSERT 在 publish timeout 时返回给客户端的状态。",
+ "Controls the status returned to the client when a normal
internal-table INSERT times out "
+ + "while waiting for publish visibility."},
+ options = {INSERT_VISIBLE_TIMEOUT_RETURN_MODE_COMMITTED,
INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR})
+ public String insertVisibleTimeoutReturnMode =
INSERT_VISIBLE_TIMEOUT_RETURN_MODE_COMMITTED;
+
// max memory used on every backend. Default value to 100G.
@VarAttrDef.VarAttr(name = EXEC_MEM_LIMIT, needForward = true)
public long maxExecMemByte = 100147483648L;
@@ -4938,6 +4971,23 @@ public class SessionVariable implements Serializable,
Writable {
}
}
+ public String getInsertVisibleTimeoutReturnMode() {
+ return getInsertVisibleTimeoutReturnModeEnum().getOption();
+ }
+
+ public InsertVisibleTimeoutReturnMode
getInsertVisibleTimeoutReturnModeEnum() {
+ return
parseInsertVisibleTimeoutReturnMode(insertVisibleTimeoutReturnMode);
+ }
+
+ public boolean isInsertVisibleTimeoutReturnError() {
+ return getInsertVisibleTimeoutReturnModeEnum() ==
InsertVisibleTimeoutReturnMode.ERROR;
+ }
+
+ public void setInsertVisibleTimeoutReturnMode(String
insertVisibleTimeoutReturnMode) {
+ this.insertVisibleTimeoutReturnMode =
parseInsertVisibleTimeoutReturnMode(insertVisibleTimeoutReturnMode)
+ .getOption();
+ }
+
public boolean getIsSingleSetVar() {
return isSingleSetVar;
}
@@ -5326,6 +5376,27 @@ public class SessionVariable implements Serializable,
Writable {
}
}
+ public void checkInsertVisibleTimeoutReturnMode(String mode) {
+ // Reuse the parser so validation stays consistent with assignment and
enum access.
+ parseInsertVisibleTimeoutReturnMode(mode);
+ }
+
+ // Parse the stored string case-insensitively and expose the enum only to
business logic.
+ private InsertVisibleTimeoutReturnMode
parseInsertVisibleTimeoutReturnMode(String mode) {
+ if (StringUtils.isEmpty(mode)) {
+ LOG.warn("insertVisibleTimeoutReturnMode value is empty");
+ throw new
UnsupportedOperationException("insertVisibleTimeoutReturnMode value is empty");
+ }
+ for (InsertVisibleTimeoutReturnMode value :
InsertVisibleTimeoutReturnMode.values()) {
+ if (value.getOption().equalsIgnoreCase(mode)) {
+ return value;
+ }
+ }
+ LOG.warn("insertVisibleTimeoutReturnMode value is invalid, the invalid
value is {}", mode);
+ throw new UnsupportedOperationException(
+ "insertVisibleTimeoutReturnMode value is invalid, the invalid
value is " + mode);
+ }
+
public void checkMaxExecutionTimeMSValid(String newValue) {
int value = Integer.valueOf(newValue);
if (value < 1000) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutorTest.java
new file mode 100644
index 00000000000..ea5318eedcb
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutorTest.java
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.profile.ExecutionProfile;
+import org.apache.doris.common.profile.Profile;
+import org.apache.doris.common.profile.SummaryProfile;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.datasource.hive.HiveTransactionMgr;
+import org.apache.doris.job.manager.JobManager;
+import org.apache.doris.job.manager.StreamingTaskManager;
+import org.apache.doris.load.EtlJobType;
+import org.apache.doris.load.loadv2.LoadManager;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.Coordinator;
+import org.apache.doris.qe.InsertResult;
+import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.TQueryOptions;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.GlobalTransactionMgrIface;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.util.Optional;
+
+/**
+ * Tests for publish-timeout behaviors in {@link OlapInsertExecutor}.
+ */
+class OlapInsertExecutorTest {
+
+ @AfterEach
+ void tearDown() {
+ ConnectContext.remove();
+ }
+
+ @Test
+ void
testExecuteSingleInsertPublishTimeoutReturnErrorKeepsCommittedAccounting()
throws Exception {
+ ConnectContext ctx = createExecutorContext();
+ ctx.getSessionVariable().setInsertVisibleTimeoutReturnMode(
+ SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR);
+
+ Coordinator coordinator = createCoordinator();
+ GlobalTransactionMgrIface txnMgr =
Mockito.mock(GlobalTransactionMgrIface.class);
+ TransactionState txnState = Mockito.mock(TransactionState.class);
+ LoadManager loadManager = Mockito.mock(LoadManager.class);
+ Env currentEnv = createCurrentEnv(loadManager);
+ StmtExecutor stmtExecutor = createStmtExecutor();
+
+ try (MockedStatic<EnvFactory> envFactoryMock =
Mockito.mockStatic(EnvFactory.class);
+ MockedStatic<Env> envMock = Mockito.mockStatic(Env.class)) {
+ prepareFactoryMocks(envFactoryMock, envMock, coordinator, txnMgr,
txnState, currentEnv);
+ ctx.setEnv(currentEnv);
+
+ Mockito.when(txnMgr.commitAndPublishTransaction(
+ Mockito.any(), Mockito.anyList(), Mockito.anyLong(),
Mockito.anyList(), Mockito.anyLong(),
+ Mockito.isNull())).thenReturn(false);
+
+ OlapInsertExecutor executor = createExecutor(ctx);
+ executor.txnId = 10001L;
+ executor.executeSingleInsert(stmtExecutor);
+
+ Assertions.assertEquals(TransactionStatus.COMMITTED,
executor.txnStatus);
+ Assertions.assertEquals(MysqlStateType.ERR,
ctx.getState().getStateType());
+ Assertions.assertTrue(ctx.getState().getErrorMessage().contains(
+ "transaction commit successfully, BUT data did not become
visible within "
+ + "insert_visible_timeout_ms and will be visible
later."));
+
+ InsertResult insertResult = ctx.getInsertResult();
+ Assertions.assertNotNull(insertResult);
+ Assertions.assertEquals(TransactionStatus.COMMITTED,
insertResult.txnStatus);
+ Assertions.assertEquals(12L, insertResult.loadedRows);
+ Assertions.assertEquals(1L, insertResult.filteredRows);
+ Assertions.assertEquals(12L, ctx.getReturnRows());
+
+
Mockito.verify(loadManager).recordFinishedLoadJob(Mockito.eq("label_test"),
Mockito.eq(10001L),
+ Mockito.eq("test_db"), Mockito.eq(2L),
Mockito.eq(EtlJobType.INSERT), Mockito.anyLong(),
+ Mockito.eq(""), Mockito.isNull(), Mockito.isNull(),
Mockito.eq(UserIdentity.ROOT),
+ Mockito.anyLong());
+ Mockito.verify(txnMgr,
Mockito.never()).abortTransaction(Mockito.anyLong(), Mockito.anyLong(),
+ Mockito.anyString());
+ }
+ }
+
+ @Test
+ void testPublishTimeoutCommittedModeReturnsOk() throws Exception {
+ ConnectContext ctx = createExecutorContext();
+ Coordinator coordinator = createCoordinator();
+ GlobalTransactionMgrIface txnMgr =
Mockito.mock(GlobalTransactionMgrIface.class);
+ TransactionState txnState = Mockito.mock(TransactionState.class);
+ LoadManager loadManager = Mockito.mock(LoadManager.class);
+ Env currentEnv = createCurrentEnv(loadManager);
+ StmtExecutor stmtExecutor = createStmtExecutor();
+
+ try (MockedStatic<EnvFactory> envFactoryMock =
Mockito.mockStatic(EnvFactory.class);
+ MockedStatic<Env> envMock = Mockito.mockStatic(Env.class)) {
+ prepareFactoryMocks(envFactoryMock, envMock, coordinator, txnMgr,
txnState, currentEnv);
+ ctx.setEnv(currentEnv);
+
+ Mockito.when(txnMgr.commitAndPublishTransaction(
+ Mockito.any(), Mockito.anyList(), Mockito.anyLong(),
Mockito.anyList(), Mockito.anyLong(),
+ Mockito.isNull())).thenReturn(false);
+
+ OlapInsertExecutor executor = createExecutor(ctx);
+ executor.txnId = 10002L;
+ executor.executeSingleInsert(stmtExecutor);
+
+ Assertions.assertEquals(TransactionStatus.COMMITTED,
executor.txnStatus);
+ Assertions.assertEquals(MysqlStateType.OK,
ctx.getState().getStateType());
+
Assertions.assertTrue(ctx.getState().getInfoMessage().contains("'status':'COMMITTED'"));
+
+ InsertResult insertResult = ctx.getInsertResult();
+ Assertions.assertNotNull(insertResult);
+ Assertions.assertEquals(TransactionStatus.COMMITTED,
insertResult.txnStatus);
+ Assertions.assertEquals(12L, insertResult.loadedRows);
+ Assertions.assertEquals(1L, insertResult.filteredRows);
+ Assertions.assertEquals(12L, ctx.getReturnRows());
+
+ Mockito.verify(txnMgr,
Mockito.never()).abortTransaction(Mockito.anyLong(), Mockito.anyLong(),
+ Mockito.anyString());
+ }
+ }
+
+ @Test
+ void testOnFailAbortsUncommittedTransaction() throws Exception {
+ ConnectContext ctx = createExecutorContext();
+ Coordinator coordinator = createCoordinator();
+ GlobalTransactionMgrIface txnMgr =
Mockito.mock(GlobalTransactionMgrIface.class);
+ TransactionState txnState = Mockito.mock(TransactionState.class);
+ LoadManager loadManager = Mockito.mock(LoadManager.class);
+ Env currentEnv = createCurrentEnv(loadManager);
+
+ try (MockedStatic<EnvFactory> envFactoryMock =
Mockito.mockStatic(EnvFactory.class);
+ MockedStatic<Env> envMock = Mockito.mockStatic(Env.class)) {
+ prepareFactoryMocks(envFactoryMock, envMock, coordinator, txnMgr,
txnState, currentEnv);
+ ctx.setEnv(currentEnv);
+
+ // Simulate a pre-commit failure so the executor must abort the
transaction.
+ OlapInsertExecutor executor = createExecutor(ctx);
+ executor.txnId = 10003L;
+ executor.txnStatus = TransactionStatus.ABORTED;
+
+ executor.onFail(new RuntimeException("pre-commit failure"));
+
+ Assertions.assertEquals(MysqlStateType.ERR,
ctx.getState().getStateType());
+
Assertions.assertTrue(ctx.getState().getErrorMessage().contains("pre-commit
failure"));
+ Assertions.assertNull(ctx.getInsertResult());
+ Mockito.verify(txnMgr).abortTransaction(Mockito.eq(1L),
Mockito.eq(10003L),
+ Mockito.eq("pre-commit failure"));
+ }
+ }
+
+ // Build a fresh context per case so insertResult and QueryState do not
leak between tests.
+ private ConnectContext createExecutorContext() {
+ ConnectContext ctx = new ConnectContext();
+ ctx.setThreadLocalInfo();
+ ctx.setCurrentUserIdentity(UserIdentity.ROOT);
+ ctx.setQueryId(new TUniqueId(1, 2));
+ // Disable strict insert mode because this test intentionally keeps
one filtered row.
+ ctx.getSessionVariable().setEnableInsertStrict(false);
+ ctx.getState().reset();
+ ctx.resetReturnRows();
+ return ctx;
+ }
+
+ // Prepare the mocked coordinator so the executor can run its completion
logic without real execution.
+ private Coordinator createCoordinator() throws Exception {
+ Coordinator coordinator = Mockito.mock(Coordinator.class);
+ // Return non-null query options because master registers the
coordinator in QeProcessor.
+ TQueryOptions queryOptions = new TQueryOptions();
+ Mockito.when(coordinator.join(Mockito.anyInt())).thenReturn(true);
+ Mockito.when(coordinator.isDone()).thenReturn(true);
+ Mockito.when(coordinator.getExecStatus()).thenReturn(new
Status(TStatusCode.OK, ""));
+ Mockito.when(coordinator.getQueryOptions()).thenReturn(queryOptions);
+
Mockito.when(coordinator.getCommitInfos()).thenReturn(Lists.newArrayList());
+ Mockito.when(coordinator.getTrackingUrl()).thenReturn(null);
+ Mockito.when(coordinator.getFirstErrorMsg()).thenReturn(null);
+
Mockito.when(coordinator.getExecutionProfile()).thenReturn(Mockito.mock(ExecutionProfile.class));
+ Mockito.when(coordinator.getLoadCounters()).thenReturn(ImmutableMap.of(
+ "dpp.norm.ALL", "12",
+ "dpp.abnorm.ALL", "1"));
+ return coordinator;
+ }
+
+ // Use a mocked executor so executeSingleInsert can run the real control
flow without a full query setup.
+ private StmtExecutor createStmtExecutor() {
+ StmtExecutor stmtExecutor = Mockito.mock(StmtExecutor.class);
+
Mockito.when(stmtExecutor.getProfile()).thenReturn(Mockito.mock(Profile.class));
+
Mockito.when(stmtExecutor.getSummaryProfile()).thenReturn(Mockito.mock(SummaryProfile.class));
+ Mockito.when(stmtExecutor.getOriginStmtInString()).thenReturn("insert
into test_tbl select 1");
+ Mockito.when(stmtExecutor.getParsedStmt()).thenReturn(null);
+ Mockito.when(stmtExecutor.isProfileSafeStmt()).thenReturn(false);
+ return stmtExecutor;
+ }
+
+ // Provide the job-manager chain needed by master-side setTxnCallbackId().
+ private Env createCurrentEnv(LoadManager loadManager) {
+ Env currentEnv = Mockito.mock(Env.class);
+ // Mock the internal catalog because ConnectContext.setEnv() resolves
the default catalog on master.
+ InternalCatalog internalCatalog = Mockito.mock(InternalCatalog.class);
+ JobManager<?, ?> jobManager = Mockito.mock(JobManager.class);
+ StreamingTaskManager streamingTaskManager =
Mockito.mock(StreamingTaskManager.class);
+
Mockito.when(currentEnv.getInternalCatalog()).thenReturn(internalCatalog);
+ Mockito.when(internalCatalog.getName()).thenReturn("internal");
+ Mockito.when(currentEnv.getLoadManager()).thenReturn(loadManager);
+ Mockito.when(currentEnv.getJobManager()).thenReturn(jobManager);
+
Mockito.when(jobManager.getStreamingTaskManager()).thenReturn(streamingTaskManager);
+
Mockito.when(streamingTaskManager.getStreamingInsertTaskById(Mockito.anyLong())).thenReturn(null);
+ return currentEnv;
+ }
+
+ // Create an executor with mocked table metadata because this test only
validates timeout result handling.
+ private OlapInsertExecutor createExecutor(ConnectContext ctx) {
+ Database database = Mockito.mock(Database.class);
+ Mockito.when(database.getFullName()).thenReturn("test_db");
+ Mockito.when(database.getId()).thenReturn(1L);
+
+ // Mock OlapTable because the master-side executor now casts the
target table to OlapTable.
+ OlapTable table = Mockito.mock(OlapTable.class);
+ Mockito.when(table.getDatabase()).thenReturn(database);
+ Mockito.when(table.getName()).thenReturn("test_tbl");
+ Mockito.when(table.getId()).thenReturn(2L);
+
+ return new OlapInsertExecutor(ctx, table, "label_test",
Mockito.mock(NereidsPlanner.class),
+ Optional.empty(), false, 0L);
+ }
+
+ // Redirect coordinator creation and transaction access to mocks so the
test stays deterministic.
+ private void prepareFactoryMocks(MockedStatic<EnvFactory> envFactoryMock,
MockedStatic<Env> envMock,
+ Coordinator coordinator, GlobalTransactionMgrIface txnMgr,
TransactionState txnState, Env currentEnv) {
+ EnvFactory envFactory = Mockito.mock(EnvFactory.class);
+ HiveTransactionMgr hiveTransactionMgr =
Mockito.mock(HiveTransactionMgr.class);
+ envFactoryMock.when(EnvFactory::getInstance).thenReturn(envFactory);
+ Mockito.when(envFactory.createCoordinator(Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.anyLong()))
+ .thenReturn(coordinator);
+
+ envMock.when(Env::getCurrentGlobalTransactionMgr).thenReturn(txnMgr);
+
envMock.when(Env::getCurrentHiveTransactionMgr).thenReturn(hiveTransactionMgr);
+ envMock.when(Env::getCurrentEnv).thenReturn(currentEnv);
+ Mockito.when(txnMgr.getTransactionState(Mockito.anyLong(),
Mockito.anyLong())).thenReturn(txnState);
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
index 06bf0065baa..b33a3ef0ad5 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
@@ -20,8 +20,10 @@ package org.apache.doris.qe;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.SetType;
import org.apache.doris.analysis.SetVar;
+import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.FeConstants;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.utframe.TestWithFeService;
@@ -32,6 +34,7 @@ import org.mockito.MockedStatic;
import org.mockito.Mockito;
import java.lang.reflect.Field;
+import java.util.HashMap;
import java.util.Map;
public class SessionVariablesTest extends TestWithFeService {
@@ -44,8 +47,7 @@ public class SessionVariablesTest extends TestWithFeService {
FeConstants.runningUnitTest = true;
createDatabase("test_d");
useDatabase("test_d");
- createTable("create table test_t1 \n" + "(k1 int, k2 int) distributed
by hash(k1) buckets 1\n"
- + "properties(\"replication_num\" = \"1\");");
+ // Skip creating an OLAP table because these cases only validate
session-variable behavior and parsing.
sessionVariable = new SessionVariable();
Field[] fields = SessionVariable.class.getDeclaredFields();
@@ -66,8 +68,13 @@ public class SessionVariablesTest extends TestWithFeService {
Assertions.assertEquals(numOfForwardVars, vars.size());
vars.put(SessionVariable.ENABLE_PROFILE, "true");
+ vars.put(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE, "ERROR");
sessionVariable.setForwardedSessionVariables(vars);
Assertions.assertTrue(sessionVariable.enableProfile);
+
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+ sessionVariable.getInsertVisibleTimeoutReturnMode());
+
Assertions.assertEquals(SessionVariable.InsertVisibleTimeoutReturnMode.ERROR,
+ sessionVariable.getInsertVisibleTimeoutReturnModeEnum());
}
@Test
@@ -82,6 +89,83 @@ public class SessionVariablesTest extends TestWithFeService {
sessionVariableClone.getSessionOriginValue().get(txIsolationSessionVariableField));
}
+ @Test
+ public void testInsertVisibleTimeoutReturnMode() throws Exception {
+ connectContext.setThreadLocalInfo();
+ SessionVariable sessionVar = connectContext.getSessionVariable();
+
+ VariableMgr.setVar(sessionVar, new SetVar(SetType.SESSION,
+ SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE, new
StringLiteral("ERROR")));
+
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+ sessionVar.getInsertVisibleTimeoutReturnMode());
+
Assertions.assertEquals(SessionVariable.InsertVisibleTimeoutReturnMode.ERROR,
+ sessionVar.getInsertVisibleTimeoutReturnModeEnum());
+
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+
sessionVar.getForwardVariables().get(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE));
+
+ SessionVariable restored = new SessionVariable();
+
restored.readFromJson("{\"insert_visible_timeout_return_mode\":\"ERROR\"}");
+
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+ restored.getInsertVisibleTimeoutReturnMode());
+
Assertions.assertEquals(SessionVariable.InsertVisibleTimeoutReturnMode.ERROR,
+ restored.getInsertVisibleTimeoutReturnModeEnum());
+
+ // Verify map restore keeps accepting canonical string tokens without
extra normalization hooks.
+ Map<String, String> restoredMap = new HashMap<>();
+ restoredMap.put(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE,
"ERROR");
+ restored.readFromMap(restoredMap);
+
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+ restored.getInsertVisibleTimeoutReturnMode());
+
Assertions.assertEquals(SessionVariable.InsertVisibleTimeoutReturnMode.ERROR,
+ restored.getInsertVisibleTimeoutReturnModeEnum());
+
+ Map<String, String> forwardVars = sessionVar.getForwardVariables();
+ forwardVars.put(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE,
"ERROR");
+ restored.setForwardedSessionVariables(forwardVars);
+
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+ restored.getInsertVisibleTimeoutReturnMode());
+
Assertions.assertEquals(SessionVariable.InsertVisibleTimeoutReturnMode.ERROR,
+ restored.getInsertVisibleTimeoutReturnModeEnum());
+
+ Field field =
SessionVariable.class.getDeclaredField("insertVisibleTimeoutReturnMode");
+ VarAttrDef.VarAttr varAttr =
field.getAnnotation(VarAttrDef.VarAttr.class);
+ Assertions.assertArrayEquals(new String[] {
+ "控制普通内表 INSERT 在 publish timeout 时返回给客户端的状态。",
+ "Controls the status returned to the client when a normal
internal-table INSERT times out "
+ + "while waiting for publish visibility."
+ }, varAttr.description());
+ Assertions.assertArrayEquals(new String[] {
+ SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_COMMITTED,
+ SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR
+ }, varAttr.options());
+
+ ExceptionChecker.expectThrowsWithMsg(DdlException.class,
+ "insertVisibleTimeoutReturnMode value is invalid",
+ () -> VariableMgr.setVar(sessionVar, new
SetVar(SetType.SESSION,
+ SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE,
new StringLiteral("unexpected"))));
+ }
+
+ @Test
+ public void testInsertVisibleTimeoutReturnModeDefaultsAndCheckerBranches()
{
+ // Cover the default branch and the helper methods used by
setter/checker paths.
+ SessionVariable sessionVar = new SessionVariable();
+
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_COMMITTED,
+ sessionVar.getInsertVisibleTimeoutReturnMode());
+ Assertions.assertFalse(sessionVar.isInsertVisibleTimeoutReturnError());
+
+ // Verify setter normalization is case-insensitive and stores the
canonical lowercase value.
+ sessionVar.setInsertVisibleTimeoutReturnMode("ErRoR");
+
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+ sessionVar.getInsertVisibleTimeoutReturnMode());
+
Assertions.assertEquals(SessionVariable.InsertVisibleTimeoutReturnMode.ERROR,
+ sessionVar.getInsertVisibleTimeoutReturnModeEnum());
+ Assertions.assertTrue(sessionVar.isInsertVisibleTimeoutReturnError());
+
+
ExceptionChecker.expectThrowsWithMsg(UnsupportedOperationException.class,
+ "insertVisibleTimeoutReturnMode value is empty",
+ () -> sessionVar.checkInsertVisibleTimeoutReturnMode(""));
+ }
+
@Test
public void testSetVarInHint() {
String sql = "insert into test_t1 select /*+
set_var(enable_nereids_dml_with_pipeline=false)*/ * from test_t1 where
enable_nereids_dml_with_pipeline=true";
diff --git
a/regression-test/data/insert_p0/test_insert_visible_timeout_return_mode.out
b/regression-test/data/insert_p0/test_insert_visible_timeout_return_mode.out
new file mode 100644
index 00000000000..fa5640a23c0
--- /dev/null
+++ b/regression-test/data/insert_p0/test_insert_visible_timeout_return_mode.out
@@ -0,0 +1,5 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !final_select --
+1 10
+2 20
+
diff --git
a/regression-test/suites/insert_p0/test_insert_visible_timeout_return_mode.groovy
b/regression-test/suites/insert_p0/test_insert_visible_timeout_return_mode.groovy
new file mode 100644
index 00000000000..978bd137d8e
--- /dev/null
+++
b/regression-test/suites/insert_p0/test_insert_visible_timeout_return_mode.groovy
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.DebugPoint
+import org.apache.doris.regression.util.NodeType
+
+suite("test_insert_visible_timeout_return_mode", "nonConcurrent") {
+ if (isCloudMode()) {
+ return
+ }
+
+ def tableName = "test_insert_visible_timeout_return_mode_tbl"
+ def debugPoint = "PublishVersionDaemon.stop_publish"
+ // Use the configured FE HTTP endpoint so the case also works when SHOW
FRONTENDS exposes loopback addresses.
+ def feHttpAddress = context.config.feHttpAddress
+ def feHost = feHttpAddress.split(":")[0]
+ def feHttpPort = Integer.parseInt(feHttpAddress.split(":")[1])
+
+ // Prepare a single-replica table so publish blocking deterministically
drives the visible timeout path.
+ sql """ DROP TABLE IF EXISTS ${tableName} FORCE """
+ sql """
+ CREATE TABLE ${tableName} (
+ `k1` INT,
+ `k2` INT
+ )
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ try {
+ // Block FE publish so inserts can commit but remain non-visible until
the debug point is removed.
+ DebugPoint.enableDebugPoint(feHost, feHttpPort, NodeType.FE,
debugPoint)
+
+ sql """ SET insert_visible_timeout_ms = 1000 """
+
+ // Verify the default committed mode returns success after the visible
wait times out.
+ sql """ SET insert_visible_timeout_return_mode = 'committed' """
+ sql """ INSERT INTO ${tableName} VALUES (1, 10) """
+
+ // Verify the error mode returns the publish-timeout error to the
client while keeping the txn committed.
+ sql """ SET insert_visible_timeout_return_mode = 'error' """
+ test {
+ sql """ INSERT INTO ${tableName} VALUES (2, 20) """
+ exception "transaction commit successfully, BUT data did not
become visible within insert_visible_timeout_ms and will be visible later."
+ }
+ } finally {
+ try {
+ DebugPoint.disableDebugPoint(feHost, feHttpPort, NodeType.FE,
debugPoint)
+ } catch (Throwable e) {
+ logger.warn("Failed to disable debug point ${debugPoint}", e)
+ }
+ }
+
+ // Wait for FE publish to resume so both committed transactions become
visible before checking final data.
+ def visible = false
+ for (int i = 0; i < 15; i++) {
+ def rowCount = sql """ SELECT COUNT(*) FROM ${tableName} """
+ if ((rowCount[0][0] as long) == 2L) {
+ visible = true
+ break
+ }
+ sleep(1000)
+ }
+ assertTrue(visible, "Rows should become visible after publish resumes")
+
+ order_qt_final_select """ SELECT * FROM ${tableName} ORDER BY k1 """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]