This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8e7454fc2a7 [improvement](binlog)Support inverted index format v2 in
CCR (#33415)
8e7454fc2a7 is described below
commit 8e7454fc2a7ef1bf862eddf1a46ff935648af12d
Author: qiye <[email protected]>
AuthorDate: Mon Apr 15 09:13:13 2024 +0800
[improvement](binlog)Support inverted index format v2 in CCR (#33415)
---
be/src/olap/rowset/beta_rowset.cpp | 49 +++--
be/src/olap/snapshot_manager.cpp | 57 +++--
be/src/olap/tablet.cpp | 19 +-
be/src/service/backend_service.cpp | 104 ++++++---
.../apache/doris/regression/suite/Syncer.groovy | 10 +-
.../test_binlog_config_change.groovy | 217 +++++++++++++++++++
.../inverted_index/test_get_binlog.groovy | 239 +++++++++++++++++++++
.../inverted_index/test_ingest_binlog.groovy | 223 +++++++++++++++++++
.../inverted_index/test_multi_buckets.groovy | 180 ++++++++++++++++
.../inverted_index/test_backup_restore.groovy | 196 +++++++++++++++++
10 files changed, 1220 insertions(+), 74 deletions(-)
diff --git a/be/src/olap/rowset/beta_rowset.cpp
b/be/src/olap/rowset/beta_rowset.cpp
index 405cfb15af7..ac97fe0f28d 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -576,24 +576,41 @@ Status BetaRowset::add_to_binlog() {
}
linked_success_files.push_back(binlog_file);
- for (const auto& index : _schema->indexes()) {
- if (index.index_type() != IndexType::INVERTED) {
- continue;
+ if (_schema->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
+ for (const auto& index : _schema->indexes()) {
+ if (index.index_type() != IndexType::INVERTED) {
+ continue;
+ }
+ auto index_id = index.index_id();
+ auto index_file = InvertedIndexDescriptor::get_index_file_name(
+ seg_file, index_id, index.get_index_suffix());
+ auto binlog_index_file = (std::filesystem::path(binlog_dir) /
+
std::filesystem::path(index_file).filename())
+ .string();
+ VLOG_DEBUG << "link " << index_file << " to " <<
binlog_index_file;
+ if (!local_fs->link_file(index_file, binlog_index_file).ok()) {
+ status = Status::Error<OS_ERROR>(
+ "fail to create hard link. from={}, to={},
errno={}", index_file,
+ binlog_index_file, Errno::no());
+ return status;
+ }
+ linked_success_files.push_back(binlog_index_file);
}
- auto index_id = index.index_id();
- auto index_file = InvertedIndexDescriptor::get_index_file_name(
- seg_file, index_id, index.get_index_suffix());
- auto binlog_index_file = (std::filesystem::path(binlog_dir) /
-
std::filesystem::path(index_file).filename())
- .string();
- VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file;
- if (!local_fs->link_file(index_file, binlog_index_file).ok()) {
- status = Status::Error<OS_ERROR>(
- "fail to create hard link. from={}, to={}, errno={}",
index_file,
- binlog_index_file, Errno::no());
- return status;
+ } else {
+ if (_schema->has_inverted_index()) {
+ auto index_file =
InvertedIndexDescriptor::get_index_file_name(seg_file);
+ auto binlog_index_file = (std::filesystem::path(binlog_dir) /
+
std::filesystem::path(index_file).filename())
+ .string();
+ VLOG_DEBUG << "link " << index_file << " to " <<
binlog_index_file;
+ if (!local_fs->link_file(index_file, binlog_index_file).ok()) {
+ status = Status::Error<OS_ERROR>(
+ "fail to create hard link. from={}, to={},
errno={}", index_file,
+ binlog_index_file, Errno::no());
+ return status;
+ }
+ linked_success_files.push_back(binlog_index_file);
}
- linked_success_files.push_back(binlog_index_file);
}
}
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 1c06414a311..f19abfb0d28 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -663,26 +663,47 @@ Status SnapshotManager::_create_snapshot_files(const
TabletSharedPtr& ref_tablet
}
linked_success_files.push_back(snapshot_segment_file_path);
- for (const auto& index : tablet_schema.indexes()) {
- if (index.index_type() != IndexType::INVERTED) {
- continue;
+ if (tablet_schema.get_inverted_index_storage_format() ==
+ InvertedIndexStorageFormatPB::V1) {
+ for (const auto& index : tablet_schema.indexes()) {
+ if (index.index_type() != IndexType::INVERTED) {
+ continue;
+ }
+ auto index_id = index.index_id();
+ auto index_file =
ref_tablet->get_segment_index_filepath(
+ rowset_id, segment_index, index_id);
+ auto snapshot_segment_index_file_path =
+ fmt::format("{}/{}_{}_{}.binlog-index",
schema_full_path, rowset_id,
+ segment_index, index_id);
+ VLOG_DEBUG << "link " << index_file << " to "
+ << snapshot_segment_index_file_path;
+ res = io::global_local_filesystem()->link_file(
+ index_file, snapshot_segment_index_file_path);
+ if (!res.ok()) {
+ LOG(WARNING) << "fail to link binlog index file.
[src=" << index_file
+ << ", dest=" <<
snapshot_segment_index_file_path << "]";
+ break;
+ }
+
linked_success_files.push_back(snapshot_segment_index_file_path);
}
- auto index_id = index.index_id();
- auto index_file = ref_tablet->get_segment_index_filepath(
- rowset_id, segment_index, index_id);
- auto snapshot_segment_index_file_path =
- fmt::format("{}/{}_{}_{}.binlog-index",
schema_full_path, rowset_id,
- segment_index, index_id);
- VLOG_DEBUG << "link " << index_file << " to "
- << snapshot_segment_index_file_path;
- res = io::global_local_filesystem()->link_file(
- index_file, snapshot_segment_index_file_path);
- if (!res.ok()) {
- LOG(WARNING) << "fail to link binlog index file.
[src=" << index_file
- << ", dest=" <<
snapshot_segment_index_file_path << "]";
- break;
+ } else {
+ if (tablet_schema.has_inverted_index()) {
+ auto index_file =
+
InvertedIndexDescriptor::get_index_file_name(segment_file_path);
+ auto snapshot_segment_index_file_path =
+ fmt::format("{}/{}_{}.binlog-index",
schema_full_path, rowset_id,
+ segment_index);
+ VLOG_DEBUG << "link " << index_file << " to "
+ << snapshot_segment_index_file_path;
+ res = io::global_local_filesystem()->link_file(
+ index_file, snapshot_segment_index_file_path);
+ if (!res.ok()) {
+ LOG(WARNING) << "fail to link binlog index file.
[src=" << index_file
+ << ", dest=" <<
snapshot_segment_index_file_path << "]";
+ break;
+ }
+
linked_success_files.push_back(snapshot_segment_index_file_path);
}
-
linked_success_files.push_back(snapshot_segment_index_file_path);
}
}
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 6734bc70063..b97fdfe8e72 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2390,14 +2390,25 @@ std::string
Tablet::get_segment_filepath(std::string_view rowset_id, int64_t seg
std::string Tablet::get_segment_index_filepath(std::string_view rowset_id,
std::string_view segment_index,
std::string_view index_id)
const {
- // TODO(qiye): support inverted index file format v2, when
https://github.com/apache/doris/pull/30145 is merged
- return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id,
segment_index, index_id);
+ auto format =
_tablet_meta->tablet_schema()->get_inverted_index_storage_format();
+ if (format == doris::InvertedIndexStorageFormatPB::V1) {
+ return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id,
segment_index,
+ index_id);
+ } else {
+ return fmt::format("{}/_binlog/{}_{}.idx", _tablet_path, rowset_id,
segment_index);
+ }
}
std::string Tablet::get_segment_index_filepath(std::string_view rowset_id,
int64_t segment_index,
int64_t index_id) const {
- // TODO(qiye): support inverted index file format v2, when
https://github.com/apache/doris/pull/30145 is merged
- return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id,
segment_index, index_id);
+ auto format =
_tablet_meta->tablet_schema()->get_inverted_index_storage_format();
+ if (format == doris::InvertedIndexStorageFormatPB::V1) {
+ return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id,
segment_index,
+ index_id);
+ } else {
+ DCHECK(index_id == -1);
+ return fmt::format("{}/_binlog/{}_{}.idx", _tablet_path, rowset_id,
segment_index);
+ }
}
std::vector<std::string> Tablet::get_binlog_filepath(std::string_view
binlog_version) const {
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index c61e0b86556..b1a110144ef 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -307,41 +307,81 @@ void _ingest_binlog(StorageEngine& engine,
IngestBinlogArg* arg) {
std::vector<uint64_t> segment_index_file_sizes;
std::vector<std::string> segment_index_file_names;
auto tablet_schema = rowset_meta->tablet_schema();
- for (const auto& index : tablet_schema->indexes()) {
- if (index.index_type() != IndexType::INVERTED) {
- continue;
+ if (tablet_schema->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
+ for (const auto& index : tablet_schema->indexes()) {
+ if (index.index_type() != IndexType::INVERTED) {
+ continue;
+ }
+ auto index_id = index.index_id();
+ for (int64_t segment_index = 0; segment_index < num_segments;
++segment_index) {
+ auto get_segment_index_file_size_url = fmt::format(
+
"{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={"
+ "}",
+ binlog_api_url, "get_segment_index_file",
request.remote_tablet_id,
+ remote_rowset_id, segment_index, index_id);
+ uint64_t segment_index_file_size;
+ auto get_segment_index_file_size_cb =
+ [&get_segment_index_file_size_url,
+ &segment_index_file_size](HttpClient* client) {
+
RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
+ client->set_timeout_ms(kMaxTimeoutMs);
+ RETURN_IF_ERROR(client->head());
+ return
client->get_content_length(&segment_index_file_size);
+ };
+ auto index_file =
InvertedIndexDescriptor::inverted_index_file_path(
+ local_tablet->tablet_path(), rowset_meta->rowset_id(),
segment_index,
+ index_id, index.get_index_suffix());
+ segment_index_file_names.push_back(index_file);
+
+ status = HttpClient::execute_with_retry(max_retry, 1,
+
get_segment_index_file_size_cb);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to get segment file size from "
+ << get_segment_index_file_size_url
+ << ", status=" << status.to_string();
+ status.to_thrift(&tstatus);
+ return;
+ }
+
+ segment_index_file_sizes.push_back(segment_index_file_size);
+
segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url));
+ }
}
- auto index_id = index.index_id();
+ } else {
for (int64_t segment_index = 0; segment_index < num_segments;
++segment_index) {
- auto get_segment_index_file_size_url = fmt::format(
-
"{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={"
- "}",
- binlog_api_url, "get_segment_index_file",
request.remote_tablet_id,
- remote_rowset_id, segment_index, index_id);
- uint64_t segment_index_file_size;
- auto get_segment_index_file_size_cb =
[&get_segment_index_file_size_url,
-
&segment_index_file_size](HttpClient* client) {
- RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
- client->set_timeout_ms(kMaxTimeoutMs);
- RETURN_IF_ERROR(client->head());
- return client->get_content_length(&segment_index_file_size);
- };
- auto index_file =
InvertedIndexDescriptor::inverted_index_file_path(
- local_tablet->tablet_path(), rowset_meta->rowset_id(),
segment_index, index_id,
- index.get_index_suffix());
- segment_index_file_names.push_back(index_file);
-
- status = HttpClient::execute_with_retry(max_retry, 1,
get_segment_index_file_size_cb);
- if (!status.ok()) {
- LOG(WARNING) << "failed to get segment file size from "
- << get_segment_index_file_size_url
- << ", status=" << status.to_string();
- status.to_thrift(&tstatus);
- return;
- }
+ if (tablet_schema->has_inverted_index()) {
+ auto get_segment_index_file_size_url = fmt::format(
+
"{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={"
+ "}",
+ binlog_api_url, "get_segment_index_file",
request.remote_tablet_id,
+ remote_rowset_id, segment_index, -1);
+ uint64_t segment_index_file_size;
+ auto get_segment_index_file_size_cb =
+ [&get_segment_index_file_size_url,
+ &segment_index_file_size](HttpClient* client) {
+
RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
+ client->set_timeout_ms(kMaxTimeoutMs);
+ RETURN_IF_ERROR(client->head());
+ return
client->get_content_length(&segment_index_file_size);
+ };
+ auto local_segment_path = BetaRowset::segment_file_path(
+ local_tablet->tablet_path(), rowset_meta->rowset_id(),
segment_index);
+ auto index_file =
InvertedIndexDescriptor::get_index_file_name(local_segment_path);
+ segment_index_file_names.push_back(index_file);
+
+ status = HttpClient::execute_with_retry(max_retry, 1,
+
get_segment_index_file_size_cb);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to get segment file size from "
+ << get_segment_index_file_size_url
+ << ", status=" << status.to_string();
+ status.to_thrift(&tstatus);
+ return;
+ }
- segment_index_file_sizes.push_back(segment_index_file_size);
-
segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url));
+ segment_index_file_sizes.push_back(segment_index_file_size);
+
segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url));
+ }
}
}
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
index 47000ab74fd..874a0e5c0be 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
@@ -427,7 +427,9 @@ class Syncer {
Boolean checkRestoreFinish() {
String checkSQL = "SHOW RESTORE FROM TEST_" + context.db
- List<Object> row = suite.sql(checkSQL)[0]
+ int size = suite.sql(checkSQL).size()
+ logger.info("Now size is ${size}")
+ List<Object> row = suite.sql(checkSQL)[size-1]
logger.info("Now row is ${row}")
return (row[4] as String) == "FINISHED"
@@ -645,9 +647,9 @@ class Syncer {
// step 2: get partitionIds
metaMap.values().forEach {
- baseSql += "/" + it.id.toString() + "/partitions"
+ def partitionSql = baseSql + "/" + it.id.toString() + "/partitions"
Map<Long, Long> partitionInfo = Maps.newHashMap()
- sqlInfo = sendSql.call(baseSql, toSrc)
+ sqlInfo = sendSql.call(partitionSql, toSrc)
for (List<Object> row : sqlInfo) {
partitionInfo.put(row[0] as Long, row[2] as Long)
}
@@ -660,7 +662,7 @@ class Syncer {
for (Entry<Long, Long> info : partitionInfo) {
// step 3.1: get partition/indexId
- String partitionSQl = baseSql + "/" + info.key.toString()
+ String partitionSQl = partitionSql + "/" + info.key.toString()
sqlInfo = sendSql.call(partitionSQl, toSrc)
if (sqlInfo.isEmpty()) {
logger.error("Target cluster partition-${info.key} indexId
fault.")
diff --git
a/regression-test/suites/ccr_syncer_p0/inverted_index/test_binlog_config_change.groovy
b/regression-test/suites/ccr_syncer_p0/inverted_index/test_binlog_config_change.groovy
new file mode 100644
index 00000000000..cccf25780d7
--- /dev/null
+++
b/regression-test/suites/ccr_syncer_p0/inverted_index/test_binlog_config_change.groovy
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_binlog_config_change_index") {
+
+ def syncer = getSyncer()
+ if (!syncer.checkEnableFeatureBinlog()) {
+ logger.info("fe enable_feature_binlog is false, skip case
test_binlog_config_change_index")
+ return
+ }
+ def insert_data = { tableName ->
+ [
+ """INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple",
100);""",
+ """INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear",
99);""",
+ """INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple",
100);""",
+ """INSERT INTO ${tableName} VALUES (2, "bason", "bason hate pear",
99);""",
+ """INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple",
100);""",
+ """INSERT INTO ${tableName} VALUES (3, "bason", "bason hate pear",
99);"""
+ ]
+ }
+
+ def sqls = { tableName ->
+ [
+ """ select * from ${tableName} order by id, name, hobbies, score
""",
+ """ select * from ${tableName} where name match "andy" order by
id, name, hobbies, score """,
+ """ select * from ${tableName} where hobbies match "pear" order by
id, name, hobbies, score """,
+ """ select * from ${tableName} where score < 100 order by id,
name, hobbies, score """
+ ]
+ }
+
+ def run_sql = { tableName ->
+ sqls(tableName).each { sqlStatement ->
+ def target_res = target_sql sqlStatement
+ def res = sql sqlStatement
+ assertEquals(res, target_res)
+ }
+ }
+
+ def create_table_v1 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ( "replication_num" = "1");
+ """
+ }
+
+ def create_table_v2 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ( "replication_num" = "1", "inverted_index_storage_format"
= "V2");
+ """
+ }
+
+ def create_table_mow_v1 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "binlog.enable" = "true",
+ "enable_unique_key_merge_on_write" = "true");
+ """
+ }
+
+ def create_table_mow_v2 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "binlog.enable" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "inverted_index_storage_format" = "V2");
+ """
+ }
+
+ def run_test = {create_table, tableName ->
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql create_table
+ sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""
+
+ target_sql "DROP TABLE IF EXISTS ${tableName}"
+ target_sql create_table
+
+ assertTrue(syncer.getTargetMeta("${tableName}"))
+
+ // test 1: target cluster follow source cluster
+ logger.info("=== Test 1: Target cluster follow source cluster case
===")
+
+ insert_data(tableName).each { sqlStatement ->
+ sql sqlStatement
+ assertTrue(syncer.getBinlog("${tableName}"))
+ assertTrue(syncer.beginTxn("${tableName}"))
+ assertTrue(syncer.getBackendClients())
+ assertTrue(syncer.ingestBinlog())
+ assertTrue(syncer.commitTxn())
+ assertTrue(syncer.checkTargetVersion())
+ syncer.closeBackendClients()
+ }
+
+ target_sql " sync "
+ def res = target_sql """SELECT * FROM ${tableName}"""
+ if (tableName.contains("mow")) {
+ assertEquals(res.size(), insert_data(tableName).size() / 2 as
Integer)
+ } else {
+ assertEquals(res.size(), insert_data(tableName).size())
+ }
+ run_sql(tableName)
+ }
+
+
+ // inverted index format v1
+ logger.info("=== Test 1: Inverted index format v1 case ===")
+ def tableName = "tbl_binlog_config_change_index_v1"
+ run_test.call(create_table_v1(tableName), tableName)
+
+ // inverted index format v2
+ logger.info("=== Test 2: Inverted index format v2 case ===")
+ tableName = "tbl_binlog_config_change_index_v2"
+ run_test.call(create_table_v2(tableName), tableName)
+
+ // inverted index format v1 with mow
+ logger.info("=== Test 3: Inverted index format v1 with mow case ===")
+ tableName = "tbl_binlog_config_change_index_mow_v1"
+ run_test.call(create_table_mow_v1(tableName), tableName)
+
+ // inverted index format v2 with mow
+ logger.info("=== Test 4: Inverted index format v2 with mow case ===")
+ tableName = "tbl_binlog_config_change_index_mow_v2"
+ run_test.call(create_table_mow_v2(tableName), tableName)
+
+ // TODO: bugfix
+ // test 2: source cluster disable and re-enable binlog
+ // target_sql "DROP TABLE IF EXISTS ${tableName}"
+ // target_sql """
+ // CREATE TABLE if NOT EXISTS ${tableName}
+ // (
+ // `test` INT,
+ // `id` INT
+ // )
+ // ENGINE=OLAP
+ // UNIQUE KEY(`test`, `id`)
+ // DISTRIBUTED BY HASH(id) BUCKETS 1
+ // PROPERTIES (
+ // "replication_allocation" = "tag.location.default: 1"
+ // )
+ // """
+ // sql """ALTER TABLE ${tableName} set ("binlog.enable" = "false")"""
+ // sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""
+
+ // syncer.context.seq = -1
+
+ // assertTrue(syncer.getBinlog("${tableName}"))
+ // assertTrue(syncer.beginTxn("${tableName}"))
+ // assertTrue(syncer.ingestBinlog())
+ // assertTrue(syncer.commitTxn())
+ // assertTrue(syncer.checkTargetVersion())
+
+ // res = target_sql """SELECT * FROM ${tableName} WHERE test=${test_num}"""
+ // assertTrue(res.size() == insert_num)
+
+}
\ No newline at end of file
diff --git
a/regression-test/suites/ccr_syncer_p0/inverted_index/test_get_binlog.groovy
b/regression-test/suites/ccr_syncer_p0/inverted_index/test_get_binlog.groovy
new file mode 100644
index 00000000000..b837f799e58
--- /dev/null
+++ b/regression-test/suites/ccr_syncer_p0/inverted_index/test_get_binlog.groovy
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_get_binlog_case_index") {
+ def syncer = getSyncer()
+ if (!syncer.checkEnableFeatureBinlog()) {
+ logger.info("fe enable_feature_binlog is false, skip case
test_get_binlog_case_index")
+ return
+ }
+
+ def insert_data = { tableName ->
+ [
+ """INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear",
99);""",
+ """INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple",
100);""",
+ """INSERT INTO ${tableName} VALUES (2, "bason", "bason hate pear",
99);""",
+ """INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple",
100);""",
+ """INSERT INTO ${tableName} VALUES (3, "bason", "bason hate pear",
99);"""
+ ]
+ }
+
+ def sqls = { tableName ->
+ [
+ """ select * from ${tableName} order by id, name, hobbies, score
""",
+ """ select * from ${tableName} where name match "andy" order by
id, name, hobbies, score """,
+ """ select * from ${tableName} where hobbies match "pear" order by
id, name, hobbies, score """,
+ """ select * from ${tableName} where score < 100 order by id,
name, hobbies, score """
+ ]
+ }
+
+ def run_sql = { tableName ->
+ sqls(tableName).each { sqlStatement ->
+ def target_res = target_sql sqlStatement
+ def res = sql sqlStatement
+ assertEquals(res, target_res)
+ }
+ }
+
+ def create_table_v1 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ( "replication_num" = "1");
+ """
+ }
+
+ def create_table_v2 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ( "replication_num" = "1", "inverted_index_storage_format"
= "V2");
+ """
+ }
+
+ def create_table_mow_v1 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "binlog.enable" = "true",
+ "enable_unique_key_merge_on_write" = "true");
+ """
+ }
+
+ def create_table_mow_v2 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "binlog.enable" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "inverted_index_storage_format" = "V2");
+ """
+ }
+
+ def run_test = { create_table, tableName ->
+ long seq = -1
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql create_table
+ sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""
+ sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple",
100); """
+ assertTrue(syncer.getBinlog("${tableName}"))
+ long firstSeq = syncer.context.seq
+
+ logger.info("=== Test 1: normal case ===")
+ insert_data(tableName).each { sqlStatement ->
+ sql sqlStatement
+ assertTrue(syncer.getBinlog("${tableName}"))
+ }
+
+ long endSeq = syncer.context.seq
+
+ logger.info("=== Test 2: Abnormal seq case ===")
+ logger.info("=== Test 2.1: too old seq case ===")
+ syncer.context.seq = -1
+ assertTrue(syncer.context.seq == -1)
+ assertTrue(syncer.getBinlog("${tableName}"))
+ assertTrue(syncer.context.seq == firstSeq)
+
+
+ logger.info("=== Test 2.2: too new seq case ===")
+ syncer.context.seq = endSeq + 100
+ assertTrue((syncer.getBinlog("${tableName}")) == false)
+
+
+ logger.info("=== Test 2.3: not find table case ===")
+ assertTrue(syncer.getBinlog("this_is_an_invalid_tbl") == false)
+
+
+ logger.info("=== Test 2.4: seq between first and end case ===")
+ long midSeq = (firstSeq + endSeq) / 2
+ syncer.context.seq = midSeq
+ assertTrue(syncer.getBinlog("${tableName}"))
+ long test5Seq = syncer.context.seq
+ assertTrue(firstSeq <= test5Seq && test5Seq <= endSeq)
+
+ logger.info("=== Test 3: Get binlog with different priv user case ===")
+ logger.info("=== Test 3.1: read only user get binlog case ===")
+ // TODO: bugfix
+ // syncer.context.seq = -1
+ // readOnlyUser = "read_only_user"
+ // sql """DROP USER IF EXISTS ${readOnlyUser}"""
+ // sql """CREATE USER ${readOnlyUser} IDENTIFIED BY '123456'"""
+ // sql """GRANT ALL ON ${context.config.defaultDb}.* TO
${readOnlyUser}"""
+ // sql """GRANT SELECT_PRIV ON TEST_${context.dbName}.${tableName} TO
${readOnlyUser}"""
+ // syncer.context.user = "${readOnlyUser}"
+ // syncer.context.passwd = "123456"
+ // assertTrue(syncer.getBinlog("${tableName}"))
+
+ }
+
+ // inverted index format v1
+ logger.info("=== Test 1: Inverted index format v1 case ===")
+ def tableName = "tbl_get_binlog_case_index_v1"
+ run_test.call(create_table_v1(tableName), tableName)
+
+ // inverted index format v2
+ logger.info("=== Test 2: Inverted index format v2 case ===")
+ tableName = "tbl_get_binlog_case_index_v2"
+ run_test.call(create_table_v2(tableName), tableName)
+
+ // inverted index format v1 with mow
+ logger.info("=== Test 3: Inverted index format v1 with mow case ===")
+ tableName = "tbl_get_binlog_case_index_mow_v1"
+ run_test.call(create_table_mow_v1(tableName), tableName)
+
+ // inverted index format v2 with mow
+ logger.info("=== Test 4: Inverted index format v2 with mow case ===")
+ tableName = "tbl_get_binlog_case_index_mow_v2"
+ run_test.call(create_table_mow_v2(tableName), tableName)
+
+ logger.info("=== Test 3: no priv user get binlog case ===")
+ syncer.context.seq = -1
+ def noPrivUser = "no_priv_user2"
+ def emptyTable = "tbl_empty_test"
+ sql "DROP TABLE IF EXISTS ${emptyTable}"
+ sql """
+ CREATE TABLE ${emptyTable} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ( "replication_num" = "1");
+ """
+ sql """CREATE USER IF NOT EXISTS ${noPrivUser} IDENTIFIED BY '123456'"""
+ sql """GRANT ALL ON ${context.config.defaultDb}.* TO ${noPrivUser}"""
+ sql """GRANT ALL ON TEST_${context.dbName}.${emptyTable} TO
${noPrivUser}"""
+ syncer.context.user = "${noPrivUser}"
+ syncer.context.passwd = "123456"
+ assertTrue((syncer.getBinlog("${tableName}")) == false)
+
+
+ logger.info("=== Test 3.3: Non-existent user set in syncer get binlog case
===")
+ syncer.context.user = "this_is_an_invalid_user"
+ syncer.context.passwd = "this_is_an_invalid_user"
+ assertTrue(syncer.getBinlog("${tableName}", false) == false)
+}
\ No newline at end of file
diff --git
a/regression-test/suites/ccr_syncer_p0/inverted_index/test_ingest_binlog.groovy
b/regression-test/suites/ccr_syncer_p0/inverted_index/test_ingest_binlog.groovy
new file mode 100644
index 00000000000..12ba49e084d
--- /dev/null
+++
b/regression-test/suites/ccr_syncer_p0/inverted_index/test_ingest_binlog.groovy
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_ingest_binlog_index") {
+
+ def syncer = getSyncer()
+ if (!syncer.checkEnableFeatureBinlog()) {
+ logger.info("fe enable_feature_binlog is false, skip case
test_ingest_binlog_index")
+ return
+ }
+
+ def insert_data = { tableName ->
+ [
+ """INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple",
100);""",
+ """INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear",
99);""",
+ """INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple",
100);""",
+ """INSERT INTO ${tableName} VALUES (2, "bason", "bason hate pear",
99);""",
+ """INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple",
100);""",
+ """INSERT INTO ${tableName} VALUES (3, "bason", "bason hate pear",
99);"""
+ ]
+ }
+
+ def sqls = { tableName ->
+ [
+ """ select * from ${tableName} order by id, name, hobbies, score
""",
+ """ select * from ${tableName} where name match "andy" order by
id, name, hobbies, score """,
+ """ select * from ${tableName} where hobbies match "pear" order by
id, name, hobbies, score """,
+ """ select * from ${tableName} where score < 100 order by id,
name, hobbies, score """
+ ]
+ }
+
+ def run_sql = { tableName ->
+ sqls(tableName).each { sqlStatement ->
+ def target_res = target_sql sqlStatement
+ def res = sql sqlStatement
+ assertEquals(res, target_res)
+ }
+ }
+
+ def create_table_v1 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ( "replication_num" = "1");
+ """
+ }
+
+ def create_table_v2 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ( "replication_num" = "1", "inverted_index_storage_format"
= "V2");
+ """
+ }
+
+ def create_table_mow_v1 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "binlog.enable" = "true",
+ "enable_unique_key_merge_on_write" = "true");
+ """
+ }
+
+ def create_table_mow_v2 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "binlog.enable" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "inverted_index_storage_format" = "V2");
+ """
+ }
+
+ def run_test = { create_table, tableName ->
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql create_table
+ sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""
+
+ target_sql "DROP TABLE IF EXISTS ${tableName}"
+ target_sql create_table
+ assertTrue(syncer.getTargetMeta("${tableName}"))
+
+ logger.info("=== Test 1: Common ingest binlog case ===")
+ insert_data.call(tableName).each { sqlStatement ->
+ sql sqlStatement
+ assertTrue(syncer.getBinlog("${tableName}"))
+ assertTrue(syncer.beginTxn("${tableName}"))
+ assertTrue(syncer.getBackendClients())
+ assertTrue(syncer.ingestBinlog())
+ assertTrue(syncer.commitTxn())
+ assertTrue(syncer.checkTargetVersion())
+ syncer.closeBackendClients()
+ }
+
+ target_sql " sync "
+ res = target_sql """SELECT * FROM ${tableName}"""
+ if (tableName.contains("mow")) {
+ assertEquals(res.size(), insert_data(tableName).size() / 2 as
Integer)
+ } else {
+ assertEquals(res.size(), insert_data(tableName).size())
+ }
+ run_sql.call(tableName)
+
+ logger.info("=== Test 2: Wrong IngestBinlogRequest case ===")
+ sql """INSERT INTO ${tableName} VALUES (4, "bason", "bason hate pear",
99);"""
+ assertTrue(syncer.getBinlog("${tableName}"))
+ assertTrue(syncer.beginTxn("${tableName}"))
+ assertTrue(syncer.getBackendClients())
+
+
+ logger.info("=== Test 2.1: Wrong txnId case ===")
+ // TODO: bugfix
+ // def originTxnId = syncer.context.txnId
+ // syncer.context.txnId = -1
+ // assertTrue(syncer.ingestBinlog() == false)
+ // syncer.context.txnId = originTxnId
+
+
+ logger.info("=== Test 2.2: Wrong binlog version case ===")
+ // -1 means use the number of syncer.context
+ // Boolean ingestBinlog(long fakePartitionId = -1, long fakeVersion =
-1)
+ // use fakeVersion = 1, 1 is doris be talet first version, so no
binlog, only http error
+ assertTrue(syncer.ingestBinlog(-1, 1) == false)
+
+
+ logger.info("=== Test 2.3: Wrong partitionId case ===")
+ // TODO: bugfix
+ // assertTrue(syncer.ingestBinlog(1, -1) == false)
+
+
+ logger.info("=== Test 2.4: Right case ===")
+ assertTrue(syncer.ingestBinlog())
+ assertTrue(syncer.commitTxn())
+ assertTrue(syncer.checkTargetVersion())
+ target_sql " sync "
+ res = target_sql """SELECT * FROM ${tableName} WHERE id=4"""
+ assertEquals(res.size(), 1)
+
+
+ // End Test 2
+ syncer.closeBackendClients()
+ }
+
+ // inverted index format v1
+ logger.info("=== Test 1: Inverted index format v1 case ===")
+ def tableName = "tbl_ingest_binlog_index_v1"
+ run_test.call(create_table_v1(tableName), tableName)
+
+ // inverted index format v2
+ logger.info("=== Test 2: Inverted index format v2 case ===")
+ tableName = "tbl_ingest_binlog_index_v2"
+ run_test.call(create_table_v2(tableName), tableName)
+
+ // inverted index format v1 with mow
+ logger.info("=== Test 3: Inverted index format v1 with mow case ===")
+ tableName = "tbl_ingest_binlog_index_mow_v1"
+ run_test.call(create_table_mow_v1(tableName), tableName)
+
+ // inverted index format v2 with mow
+ logger.info("=== Test 4: Inverted index format v2 with mow case ===")
+ tableName = "tbl_ingest_binlog_index_mow_v2"
+ run_test.call(create_table_mow_v2(tableName), tableName)
+
+}
diff --git
a/regression-test/suites/ccr_syncer_p0/inverted_index/test_multi_buckets.groovy
b/regression-test/suites/ccr_syncer_p0/inverted_index/test_multi_buckets.groovy
new file mode 100644
index 00000000000..4a7b7263af2
--- /dev/null
+++
b/regression-test/suites/ccr_syncer_p0/inverted_index/test_multi_buckets.groovy
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_multi_buckets_index") {
+
+ def syncer = getSyncer()
+ if (!syncer.checkEnableFeatureBinlog()) {
+ logger.info("fe enable_feature_binlog is false, skip case
test_multi_buckets_index")
+ return
+ }
+ def insert_data = { tableName ->
+ [
+ """INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple",
100);""",
+ """INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear",
99);""",
+ """INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple",
100);""",
+ """INSERT INTO ${tableName} VALUES (2, "bason", "bason hate pear",
99);""",
+ """INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple",
100);""",
+ """INSERT INTO ${tableName} VALUES (3, "bason", "bason hate pear",
99);"""
+ ]
+ }
+
+ def sqls = { tableName ->
+ [
+ """ select * from ${tableName} order by id, name, hobbies, score
""",
+ """ select * from ${tableName} where name match "andy" order by
id, name, hobbies, score """,
+ """ select * from ${tableName} where hobbies match "pear" order by
id, name, hobbies, score """,
+ """ select * from ${tableName} where score < 100 order by id,
name, hobbies, score """
+ ]
+ }
+
+ def run_sql = { tableName ->
+ sqls(tableName).each { sqlStatement ->
+ def target_res = target_sql sqlStatement
+ def res = sql sqlStatement
+ assertEquals(res, target_res)
+ }
+ }
+
+ def create_table_v1 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES ( "replication_num" = "1");
+ """
+ }
+
+ def create_table_v2 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES ( "replication_num" = "1", "inverted_index_storage_format"
= "V2");
+ """
+ }
+
+ def create_table_mow_v1 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1",
+ "binlog.enable" = "true",
+ "enable_unique_key_merge_on_write" = "true");
+ """
+ }
+
+ def create_table_mow_v2 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1",
+ "binlog.enable" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "inverted_index_storage_format" = "V2");
+ """
+ }
+
+ def run_test = { create_table, tableName ->
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql create_table
+ sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""
+
+ target_sql "DROP TABLE IF EXISTS ${tableName}"
+ target_sql create_table
+ assertTrue(syncer.getTargetMeta("${tableName}"))
+
+ insert_data(tableName).each { sqlStatement ->
+ sql sqlStatement
+ assertTrue(syncer.getBinlog("${tableName}"))
+ assertTrue(syncer.beginTxn("${tableName}"))
+ assertTrue(syncer.getBackendClients())
+ assertTrue(syncer.ingestBinlog())
+ assertTrue(syncer.commitTxn())
+ assertTrue(syncer.checkTargetVersion())
+ syncer.closeBackendClients()
+ }
+
+ target_sql " sync "
+ def res = target_sql """SELECT * FROM ${tableName}"""
+ if (tableName.contains("mow")) {
+ assertEquals(res.size(), insert_data(tableName).size() / 2 as
Integer)
+ } else {
+ assertEquals(res.size(), insert_data(tableName).size())
+ }
+ run_sql(tableName)
+ }
+
+ // inverted index format v1
+ logger.info("=== Test 1: Inverted index format v1 case ===")
+ def tableName = "tbl_multi_buckets_index_v1"
+ run_test.call(create_table_v1(tableName), tableName)
+ // inverted index format v2
+ logger.info("=== Test 2: Inverted index format v2 case ===")
+ tableName = "tbl_multi_buckets_index_v2"
+ run_test.call(create_table_v2(tableName), tableName)
+
+ // inverted index format v1 with mow
+ logger.info("=== Test 3: Inverted index format v1 with mow case ===")
+ tableName = "tbl_multi_buckets_index_mow_v1"
+ run_test.call(create_table_mow_v1(tableName), tableName)
+
+ // inverted index format v2 with mow
+ logger.info("=== Test 4: Inverted index format v2 with mow case ===")
+ tableName = "tbl_multi_buckets_index_mow_v2"
+ run_test.call(create_table_mow_v2(tableName), tableName)
+}
diff --git
a/regression-test/suites/ccr_syncer_p1/inverted_index/test_backup_restore.groovy
b/regression-test/suites/ccr_syncer_p1/inverted_index/test_backup_restore.groovy
new file mode 100644
index 00000000000..86f4cc89913
--- /dev/null
+++
b/regression-test/suites/ccr_syncer_p1/inverted_index/test_backup_restore.groovy
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_backup_restore_index") {
+
+ def syncer = getSyncer()
+ if (!syncer.checkEnableFeatureBinlog()) {
+ logger.info("fe enable_feature_binlog is false, skip case
test_backup_restore")
+ return
+ }
+
+ def insert_data = { tableName ->
+ [
+ """INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple",
100);""",
+ """INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear",
99);""",
+ """INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple",
100);""",
+ """INSERT INTO ${tableName} VALUES (2, "bason", "bason hate pear",
99);""",
+ """INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple",
100);""",
+ """INSERT INTO ${tableName} VALUES (3, "bason", "bason hate pear",
99);"""
+ ]
+ }
+
+ def sqls = { tableName ->
+ [
+ """ select * from ${tableName} order by id, name, hobbies, score
""",
+ """ select * from ${tableName} where name match "andy" order by
id, name, hobbies, score """,
+ """ select * from ${tableName} where hobbies match "pear" order by
id, name, hobbies, score """,
+ """ select * from ${tableName} where score < 100 order by id,
name, hobbies, score """
+ ]
+ }
+
+ def run_sql = { tableName ->
+ sqls(tableName).each { sqlStatement ->
+ def target_res = target_sql sqlStatement
+ def res = sql sqlStatement
+ assertEquals(res, target_res)
+ }
+ }
+
+ def create_table_v1 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "binlog.enable" = "true"
+ );
+ """
+ }
+
+ def create_table_v2 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "binlog.enable" = "true",
+ "inverted_index_storage_format" = "V2"
+ );
+ """
+ }
+
+ def create_table_mow_v1 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "binlog.enable" = "true",
+ "enable_unique_key_merge_on_write" = "true");
+ """
+ }
+
+ def create_table_mow_v2 = { tableName ->
+ """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_hobbies (hobbies) using inverted
properties("parser"="english"),
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "binlog.enable" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "inverted_index_storage_format" = "V2");
+ """
+ }
+
+ def run_test = { create_table, tableName ->
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql create_table
+
+ logger.info("=== Test 1: Common backup and restore ===")
+ def snapshotName = "snapshot_test_" + tableName
+ insert_data(tableName).each { sqlStatement ->
+ sql sqlStatement
+ }
+ sql " sync "
+ def res = sql "SELECT * FROM ${tableName}"
+ if (tableName.contains("mow")) {
+ assertEquals(res.size(), insert_data(tableName).size() / 2 as
Integer)
+ } else {
+ assertEquals(res.size(), insert_data(tableName).size())
+ }
+
+ sql """
+ BACKUP SNAPSHOT ${context.dbName}.${snapshotName}
+ TO `__keep_on_local__`
+ ON (${tableName})
+ PROPERTIES ("type" = "full")
+ """
+ syncer.waitSnapshotFinish()
+ assertTrue(syncer.getSnapshot("${snapshotName}", "${tableName}"))
+ assertTrue(syncer.restoreSnapshot(true))
+ syncer.waitTargetRestoreFinish()
+ target_sql " sync "
+ res = target_sql "SELECT * FROM ${tableName}"
+ if (tableName.contains("mow")) {
+ assertEquals(res.size(), insert_data(tableName).size() / 2 as
Integer)
+ } else {
+ assertEquals(res.size(), insert_data(tableName).size())
+ }
+ run_sql(tableName)
+ }
+
+ // inverted index format v1
+ logger.info("=== Test 1: Inverted index format v1 case ===")
+ def tableName = "tbl_backup_restore_index_v1"
+ run_test.call(create_table_v1(tableName), tableName)
+
+ // inverted index format v2
+ logger.info("=== Test 2: Inverted index format v2 case ===")
+ tableName = "tbl_backup_restore_index_v2"
+ run_test.call(create_table_v2(tableName), tableName)
+
+ // inverted index format v1 with mow
+ logger.info("=== Test 3: Inverted index format v1 with mow case ===")
+ tableName = "tbl_backup_restore_index_mow_v1"
+ run_test.call(create_table_mow_v1(tableName), tableName)
+
+ // inverted index format v2 with mow
+ logger.info("=== Test 4: Inverted index format v2 with mow case ===")
+ tableName = "tbl_backup_restore_index_mow_v2"
+ run_test.call(create_table_mow_v2(tableName), tableName)
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]