This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 5766047c616 branch-3.0: [fix](group commit) fix group commit with
schema change (#51144) (#51306)
5766047c616 is described below
commit 5766047c616a29a0fb389203bd4b0735bbb17f2b
Author: meiyi <[email protected]>
AuthorDate: Thu May 29 10:07:54 2025 +0800
branch-3.0: [fix](group commit) fix group commit with schema change
(#51144) (#51306)
pick https://github.com/apache/doris/pull/51144
---
be/src/olap/wal/wal_table.cpp | 1 +
.../exec/group_commit_block_sink_operator.cpp | 8 +-
be/src/runtime/group_commit_mgr.cpp | 36 +++---
be/src/runtime/group_commit_mgr.h | 15 ++-
.../org/apache/doris/alter/SchemaChangeJobV2.java | 17 +++
.../apache/doris/service/FrontendServiceImpl.java | 4 +
.../test_group_commit_schema_change.out | Bin 0 -> 115 bytes
.../group_commit/test_group_commit_error.groovy | 19 +++
.../test_group_commit_replay_wal.groovy | 15 ++-
.../test_group_commit_schema_change.groovy | 135 +++++++++++++++++++++
10 files changed, 224 insertions(+), 26 deletions(-)
diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp
index a671717b50f..aed180c86a3 100644
--- a/be/src/olap/wal/wal_table.cpp
+++ b/be/src/olap/wal/wal_table.cpp
@@ -251,6 +251,7 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const
std::string& wal,
ctx->group_commit = false;
ctx->load_type = TLoadType::MANUL_LOAD;
ctx->load_src_type = TLoadSourceType::RAW;
+ ctx->max_filter_ratio = 1;
auto st = _http_stream_action->process_put(nullptr, ctx);
if (st.ok()) {
// wait stream load finish
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index a9201f0302f..8b1ef1be95a 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -68,9 +68,9 @@ Status
GroupCommitBlockSinkLocalState::_initialize_load_queue() {
auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
if (_state->exec_env()->wal_mgr()->is_running()) {
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
- p._db_id, p._table_id, p._base_schema_version, p._load_id,
_load_block_queue,
- _state->be_exec_version(), _state->query_mem_tracker(),
_create_plan_dependency,
- _put_block_dependency));
+ p._db_id, p._table_id, p._base_schema_version,
p._schema->indexes().size(),
+ p._load_id, _load_block_queue, _state->be_exec_version(),
+ _state->query_mem_tracker(), _create_plan_dependency,
_put_block_dependency));
_state->set_import_label(_load_block_queue->label);
_state->set_wal_id(_load_block_queue->txn_id); // wal_id is txn_id
return Status::OK();
@@ -259,7 +259,7 @@ Status GroupCommitBlockSinkOperatorX::init(const TDataSink&
t_sink) {
RETURN_IF_ERROR(_schema->init(table_sink.schema));
_db_id = table_sink.db_id;
_table_id = table_sink.table_id;
- _base_schema_version = table_sink.base_schema_version;
+ _base_schema_version = _schema->version();
_partition = table_sink.partition;
_group_commit_mode = table_sink.group_commit_mode;
_load_id = table_sink.load_id;
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index f49d6708bcc..0f22dbf4573 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -46,6 +46,7 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
return runtime_state->cancel_reason();
}
RETURN_IF_ERROR(status);
+ DBUG_EXECUTE_IF("LoadBlockQueue.add_block.block", DBUG_BLOCK);
if (block->rows() > 0) {
if (!config::group_commit_wait_replay_wal_finish) {
_block_queue.emplace_back(block);
@@ -144,7 +145,7 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
<< ", duration=" << duration << ", load_ids=" <<
get_load_ids();
}
}
- if (!_need_commit && !timer_dependency->ready()) {
+ if (!_need_commit) {
get_block_dep->block();
VLOG_DEBUG << "block get_block for query_id=" << load_instance_id;
}
@@ -254,7 +255,7 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st)
{
}
Status GroupCommitTable::get_first_block_load_queue(
- int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
+ int64_t table_id, int64_t base_schema_version, int64_t index_size,
const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
@@ -270,7 +271,8 @@ Status GroupCommitTable::get_first_block_load_queue(
}
for (const auto& [_, inner_block_queue] : _load_block_queues) {
if (!inner_block_queue->need_commit()) {
- if (base_schema_version == inner_block_queue->schema_version) {
+ if (base_schema_version == inner_block_queue->schema_version &&
+ index_size == inner_block_queue->index_size) {
if (inner_block_queue->add_load_id(load_id,
put_block_dep).ok()) {
load_block_queue = inner_block_queue;
return Status::OK();
@@ -290,8 +292,8 @@ Status GroupCommitTable::get_first_block_load_queue(
return Status::OK();
}
create_plan_dep->block();
- _create_plan_deps.emplace(load_id,
- std::make_tuple(create_plan_dep, put_block_dep,
base_schema_version));
+ _create_plan_deps.emplace(load_id, std::make_tuple(create_plan_dep,
put_block_dep,
+ base_schema_version,
index_size));
if (!_is_creating_plan_fragment) {
_is_creating_plan_fragment = true;
RETURN_IF_ERROR(
@@ -378,18 +380,21 @@ Status GroupCommitTable::_create_group_commit_load(int
be_exe_version,
LOG(WARNING) << "create group commit load error, st=" <<
st.to_string();
return st;
}
- auto schema_version = result.base_schema_version;
auto& pipeline_params = result.pipeline_params;
+ auto schema_version =
pipeline_params.fragment.output_sink.olap_table_sink.schema.version;
+ auto index_size =
+
pipeline_params.fragment.output_sink.olap_table_sink.schema.indexes.size();
DCHECK(pipeline_params.fragment.output_sink.olap_table_sink.db_id ==
_db_id);
txn_id = pipeline_params.txn_conf.txn_id;
DCHECK(pipeline_params.local_params.size() == 1);
instance_id = pipeline_params.local_params[0].fragment_instance_id;
VLOG_DEBUG << "create plan fragment, db_id=" << _db_id << ", table="
<< _table_id
- << ", schema version=" << schema_version << ", label=" <<
label
- << ", txn_id=" << txn_id << ", instance_id=" <<
print_id(instance_id);
+ << ", schema version=" << schema_version << ", index size="
<< index_size
+ << ", label=" << label << ", txn_id=" << txn_id
+ << ", instance_id=" << print_id(instance_id);
{
auto load_block_queue = std::make_shared<LoadBlockQueue>(
- instance_id, label, txn_id, schema_version,
_all_block_queues_bytes,
+ instance_id, label, txn_id, schema_version, index_size,
_all_block_queues_bytes,
result.wait_internal_group_commit_finish,
result.group_commit_interval_ms,
result.group_commit_data_bytes);
RETURN_IF_ERROR(load_block_queue->create_wal(
@@ -403,7 +408,8 @@ Status GroupCommitTable::_create_group_commit_load(int
be_exe_version,
for (const auto& [id, load_info] : _create_plan_deps) {
auto create_dep = std::get<0>(load_info);
auto put_dep = std::get<1>(load_info);
- if (load_block_queue->schema_version ==
std::get<2>(load_info)) {
+ if (load_block_queue->schema_version == std::get<2>(load_info)
&&
+ load_block_queue->index_size == std::get<3>(load_info)) {
if (load_block_queue->add_load_id(id, put_dep).ok()) {
create_dep->set_ready();
success_load_ids.emplace_back(id);
@@ -624,9 +630,9 @@ void GroupCommitMgr::stop() {
}
Status GroupCommitMgr::get_first_block_load_queue(
- int64_t db_id, int64_t table_id, int64_t base_schema_version, const
UniqueId& load_id,
- std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
- std::shared_ptr<MemTrackerLimiter> mem_tracker,
+ int64_t db_id, int64_t table_id, int64_t base_schema_version, int64_t
index_size,
+ const UniqueId& load_id, std::shared_ptr<LoadBlockQueue>&
load_block_queue,
+ int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep) {
std::shared_ptr<GroupCommitTable> group_commit_table;
@@ -640,8 +646,8 @@ Status GroupCommitMgr::get_first_block_load_queue(
group_commit_table = _table_map[table_id];
}
RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
- table_id, base_schema_version, load_id, load_block_queue,
be_exe_version, mem_tracker,
- create_plan_dep, put_block_dep));
+ table_id, base_schema_version, index_size, load_id,
load_block_queue, be_exe_version,
+ mem_tracker, create_plan_dep, put_block_dep));
return Status::OK();
}
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index 32579547893..2be17400026 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -55,7 +55,7 @@ struct BlockData {
class LoadBlockQueue {
public:
LoadBlockQueue(const UniqueId& load_instance_id, std::string& label,
int64_t txn_id,
- int64_t schema_version,
+ int64_t schema_version, int64_t index_size,
std::shared_ptr<std::atomic_size_t> all_block_queues_bytes,
bool wait_internal_group_commit_finish, int64_t
group_commit_interval_ms,
int64_t group_commit_data_bytes)
@@ -63,6 +63,7 @@ public:
label(label),
txn_id(txn_id),
schema_version(schema_version),
+ index_size(index_size),
wait_internal_group_commit_finish(wait_internal_group_commit_finish),
_group_commit_interval_ms(group_commit_interval_ms),
_start_time(std::chrono::steady_clock::now()),
@@ -108,6 +109,7 @@ public:
std::string label;
int64_t txn_id;
int64_t schema_version;
+ int64_t index_size;
bool wait_internal_group_commit_finish = false;
bool data_size_condition = false;
@@ -157,7 +159,7 @@ public:
_db_id(db_id),
_table_id(table_id) {};
Status get_first_block_load_queue(int64_t table_id, int64_t
base_schema_version,
- const UniqueId& load_id,
+ int64_t index_size, const UniqueId&
load_id,
std::shared_ptr<LoadBlockQueue>&
load_block_queue,
int be_exe_version,
std::shared_ptr<MemTrackerLimiter>
mem_tracker,
@@ -189,9 +191,10 @@ private:
// fragment_instance_id to load_block_queue
std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>>
_load_block_queues;
bool _is_creating_plan_fragment = false;
- // user_load_id -> <create_plan_dep, put_block_dep, base_schema_version>
- std::unordered_map<UniqueId,
std::tuple<std::shared_ptr<pipeline::Dependency>,
-
std::shared_ptr<pipeline::Dependency>, int64_t>>
+ // user_load_id -> <create_plan_dep, put_block_dep, base_schema_version,
index_size>
+ std::unordered_map<UniqueId,
+ std::tuple<std::shared_ptr<pipeline::Dependency>,
+ std::shared_ptr<pipeline::Dependency>,
int64_t, int64_t>>
_create_plan_deps;
};
@@ -207,7 +210,7 @@ public:
std::shared_ptr<LoadBlockQueue>&
load_block_queue,
std::shared_ptr<pipeline::Dependency>
get_block_dep);
Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t
base_schema_version,
- const UniqueId& load_id,
+ int64_t index_size, const UniqueId&
load_id,
std::shared_ptr<LoadBlockQueue>&
load_block_queue,
int be_exe_version,
std::shared_ptr<MemTrackerLimiter>
mem_tracker,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index ea53a931131..b3136b41b04 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -48,6 +48,7 @@ import org.apache.doris.common.SchemaVersionAndHash;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DbUtil;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.task.AgentBatchTask;
@@ -377,6 +378,14 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
// create all replicas success.
// add all shadow indexes to catalog
+ while
(DebugPointUtil.isEnable("FE.SchemaChangeJobV2.createShadowIndexReplica.addShadowIndexToCatalog.block"))
{
+ try {
+ Thread.sleep(1000);
+ LOG.info("block addShadowIndexToCatalog for job: {}", jobId);
+ } catch (InterruptedException e) {
+ LOG.warn("InterruptedException: ", e);
+ }
+ }
tbl.writeLockOrAlterCancelException();
try {
Preconditions.checkState(tbl.getState() ==
OlapTableState.SCHEMA_CHANGE);
@@ -609,6 +618,14 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
}
return;
}
+ while
(DebugPointUtil.isEnable("FE.SchemaChangeJobV2.runRunning.block")) {
+ try {
+ Thread.sleep(1000);
+ LOG.info("block schema change for job: {}", jobId);
+ } catch (InterruptedException e) {
+ LOG.warn("InterruptedException: ", e);
+ }
+ }
Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId);
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(tableId);
Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index bbf8555f0fd..9140b033f5e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2174,6 +2174,10 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
String originStmt = request.getLoadSql();
HttpStreamParams httpStreamParams;
try {
+ while
(DebugPointUtil.isEnable("FE.FrontendServiceImpl.initHttpStreamPlan.block")) {
+ Thread.sleep(1000);
+ LOG.info("block initHttpStreamPlan");
+ }
StmtExecutor executor = new StmtExecutor(ctx, originStmt);
ctx.setExecutor(executor);
httpStreamParams = executor.generateHttpStreamPlan(ctx.queryId());
diff --git
a/regression-test/data/insert_p0/group_commit/test_group_commit_schema_change.out
b/regression-test/data/insert_p0/group_commit/test_group_commit_schema_change.out
new file mode 100644
index 00000000000..3747bce7722
Binary files /dev/null and
b/regression-test/data/insert_p0/group_commit/test_group_commit_schema_change.out
differ
diff --git
a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
index 7f785a3292f..6e9a89aa0f7 100644
---
a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
+++
b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
@@ -73,4 +73,23 @@ suite("test_group_commit_error", "nonConcurrent") {
} finally {
GetDebugPoint().clearDebugPointsForAllBEs()
}
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue.add_block.block")
+ Thread thread = new Thread(() -> {
+ sql """ set group_commit = async_mode """
+ sql """ insert into ${tableName} values (5, 4) """
+ })
+ thread.start()
+ sleep(4000)
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ thread.join()
+ def result = sql "select count(*) from ${tableName}"
+ logger.info("rowCount 0: ${result}")
+ } catch (Exception e) {
+ logger.warn("unexpected failed: " + e.getMessage())
+ assertTrue(false, "unexpected failed: " + e.getMessage())
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
}
\ No newline at end of file
diff --git
a/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy
b/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy
index 2858d1e4f51..0fbc3ec0a8d 100644
---
a/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy
+++
b/regression-test/suites/insert_p0/group_commit/test_group_commit_replay_wal.groovy
@@ -37,6 +37,10 @@ suite("test_group_commit_replay_wal", "nonConcurrent") {
`k` int ,
`v` int ,
) engine=olap
+ PARTITION BY LIST(k) (
+ PARTITION p1 VALUES IN ("1","2","3","4"),
+ PARTITION p2 VALUES IN ("5")
+ )
DISTRIBUTED BY HASH(`k`)
BUCKETS 5
properties("replication_num" = "1", "group_commit_interval_ms"="2000")
@@ -86,9 +90,18 @@ suite("test_group_commit_replay_wal", "nonConcurrent") {
sleep(4000) // wal replay but all failed
getRowCount(5)
// check wal count is 1
+ sql """ ALTER TABLE ${tableName} DROP PARTITION p2 """
+ for (int i = 0; i < 10; i++) {
+ List<List<Object>> partitions = sql "show partitions from
${tableName};"
+ logger.info("partitions: ${partitions}")
+ if (partitions.size() == 1) {
+ break
+ }
+ sleep(100)
+ }
GetDebugPoint().clearDebugPointsForAllFEs()
- getRowCount(10)
+ getRowCount(8)
// check wal count is 0
} catch (Exception e) {
logger.info("failed: " + e.getMessage())
diff --git
a/regression-test/suites/insert_p0/group_commit/test_group_commit_schema_change.groovy
b/regression-test/suites/insert_p0/group_commit/test_group_commit_schema_change.groovy
new file mode 100644
index 00000000000..06bbbebef5b
--- /dev/null
+++
b/regression-test/suites/insert_p0/group_commit/test_group_commit_schema_change.groovy
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import groovyjarjarantlr4.v4.codegen.model.ExceptionClause
+
+import java.util.Date
+import java.text.SimpleDateFormat
+import org.apache.http.HttpResponse
+import org.apache.http.client.methods.HttpPut
+import org.apache.http.impl.client.CloseableHttpClient
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.entity.ContentType
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.config.RequestConfig
+import org.apache.http.client.RedirectStrategy
+import org.apache.http.protocol.HttpContext
+import org.apache.http.HttpRequest
+import org.apache.http.impl.client.LaxRedirectStrategy
+import org.apache.http.client.methods.RequestBuilder
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.util.EntityUtils
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_group_commit_schema_change", "nonConcurrent") {
+ def tableName3 = "test_group_commit_schema_change"
+
+ onFinish {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ }
+
+ def getJobState = { tableName ->
+ def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE
IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
+ logger.info("jobStateResult: ${jobStateResult}")
+ return jobStateResult[0][9]
+ }
+
+ def getRowCount = { expectedRowCount ->
+ Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until(
+ {
+ def result = sql "select count(*) from ${tableName3}"
+ logger.info("table: ${tableName3}, rowCount: ${result}")
+ return result[0][0] == expectedRowCount
+ }
+ )
+ }
+
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ sql """ DROP TABLE IF EXISTS ${tableName3} """
+ sql """
+ CREATE TABLE ${tableName3} (
+ `id` int(11) NOT NULL,
+ `name` varchar(50) NULL,
+ `score` varchar(11) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "group_commit_interval_ms" = "200"
+ );
+ """
+
+
GetDebugPoint().enableDebugPointForAllFEs("FE.FrontendServiceImpl.initHttpStreamPlan.block")
+
GetDebugPoint().enableDebugPointForAllFEs("FE.SchemaChangeJobV2.createShadowIndexReplica.addShadowIndexToCatalog.block")
+
+ // write data
+ Thread thread = new Thread(() -> {
+ sql """ set group_commit = async_mode; """
+ for (int i = 0; i < 10; i++) {
+ try {
+ sql """ insert into ${tableName3} values (1, 'a', 100) """
+ break
+ } catch (Exception e) {
+ logger.info("insert error: ${e}")
+ if (e.getMessage().contains("schema version not match")) {
+ continue
+ } else {
+ throw e
+ }
+ }
+ }
+ })
+ thread.start()
+ sleep(1000)
+ def result = sql "select count(*) from ${tableName3}"
+ logger.info("rowCount 0: ${result}")
+ assertEquals(0, result[0][0])
+
+ // schema change
+ sql """ alter table ${tableName3} modify column score int NULL"""
+
GetDebugPoint().enableDebugPointForAllFEs("FE.SchemaChangeJobV2.runRunning.block")
+
GetDebugPoint().disableDebugPointForAllFEs("FE.SchemaChangeJobV2.createShadowIndexReplica.addShadowIndexToCatalog.block")
+ for (int i = 0; i < 10; i++) {
+ def job_state = getJobState(tableName3)
+ if (job_state == "RUNNING") {
+ break
+ }
+ sleep(100)
+ }
+
+
GetDebugPoint().disableDebugPointForAllFEs("FE.FrontendServiceImpl.initHttpStreamPlan.block")
+ thread.join()
+ getRowCount(1)
+ qt_sql """ select id, name, score from ${tableName3} """
+ def job_state = getJobState(tableName3)
+ assertEquals("RUNNING", job_state)
+
GetDebugPoint().disableDebugPointForAllFEs("FE.SchemaChangeJobV2.runRunning.block")
+ for (int i = 0; i < 10; i++) {
+ job_state = getJobState(tableName3)
+ if (job_state == "FINISHED") {
+ break
+ }
+ sleep(100)
+ }
+ assertEquals("FINISHED", job_state)
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]