This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new e0c1d5321af [feature](restore) support atomic restore (#41104)
e0c1d5321af is described below
commit e0c1d5321afb5c5f6c00ac9c27a2d4a7b1a195d1
Author: walter <[email protected]>
AuthorDate: Tue Sep 24 09:41:53 2024 +0800
[feature](restore) support atomic restore (#41104)
Cherry-pick #40353, #40734, #40817, #40876, #40921, #41017, #41083
---
be/src/olap/rowset/rowset_meta_manager.cpp | 63 ++++
be/src/olap/rowset/rowset_meta_manager.h | 3 +
be/src/olap/snapshot_manager.cpp | 42 ++-
be/src/olap/snapshot_manager.h | 1 +
be/src/olap/tablet.cpp | 5 +
be/src/olap/tablet.h | 1 +
be/src/olap/tablet_manager.cpp | 30 +-
be/src/olap/task/engine_storage_migration_task.cpp | 83 ++++-
be/src/olap/task/engine_storage_migration_task.h | 5 +-
.../org/apache/doris/analysis/RestoreStmt.java | 9 +
.../org/apache/doris/backup/BackupHandler.java | 6 +-
.../apache/doris/backup/RestoreFileMapping.java | 18 +-
.../java/org/apache/doris/backup/RestoreJob.java | 376 ++++++++++++++++++---
.../main/java/org/apache/doris/catalog/Env.java | 8 +
.../java/org/apache/doris/catalog/OlapTable.java | 19 ++
.../org/apache/doris/catalog/TableProperty.java | 24 +-
.../apache/doris/common/util/PropertyAnalyzer.java | 1 +
.../apache/doris/datasource/InternalCatalog.java | 14 +-
.../apache/doris/service/FrontendServiceImpl.java | 3 +
.../java/org/apache/doris/task/SnapshotTask.java | 13 +-
.../doris/backup/RestoreFileMappingTest.java | 6 +-
.../org/apache/doris/backup/RestoreJobTest.java | 3 +-
gensrc/thrift/AgentService.thrift | 1 +
gensrc/thrift/FrontendService.thrift | 1 +
.../backup_restore/test_backup_restore_atomic.out | 78 +++++
.../test_backup_restore_atomic_with_view.out | 60 ++++
.../org/apache/doris/regression/suite/Suite.groovy | 10 +
.../test_backup_restore_atomic.groovy | 209 ++++++++++++
.../test_backup_restore_atomic_cancel.groovy | 128 +++++++
.../test_backup_restore_atomic_with_alter.groovy | 241 +++++++++++++
.../test_backup_restore_atomic_with_view.groovy | 124 +++++++
31 files changed, 1505 insertions(+), 80 deletions(-)
diff --git a/be/src/olap/rowset/rowset_meta_manager.cpp
b/be/src/olap/rowset/rowset_meta_manager.cpp
index db0ad8f3d45..87b32cd9393 100644
--- a/be/src/olap/rowset/rowset_meta_manager.cpp
+++ b/be/src/olap/rowset/rowset_meta_manager.cpp
@@ -361,6 +361,69 @@ Status
RowsetMetaManager::_get_rowset_binlog_metas(OlapMeta* meta, const TabletU
return status;
}
+Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, TabletUid
tablet_uid,
+ Version version,
RowsetBinlogMetasPB* metas_pb) {
+ Status status;
+ auto tablet_uid_str = tablet_uid.to_string();
+ auto prefix_key = make_binlog_meta_key_prefix(tablet_uid);
+ auto begin_key = make_binlog_meta_key_prefix(tablet_uid, version.first);
+ auto end_key = make_binlog_meta_key_prefix(tablet_uid, version.second + 1);
+ auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &end_key](
+ std::string_view key, std::string_view value)
-> bool {
+ VLOG_DEBUG << fmt::format("get rowset binlog metas, key={}, value={}",
key, value);
+ if (key.compare(end_key) > 0) { // the binlog meta key is binary
comparable.
+ // All binlog meta has been scanned
+ return false;
+ }
+
+ if (!starts_with_binlog_meta(key)) {
+ auto err_msg = fmt::format("invalid binlog meta key:{}", key);
+ status = Status::InternalError(err_msg);
+ LOG(WARNING) << err_msg;
+ return false;
+ }
+
+ BinlogMetaEntryPB binlog_meta_entry_pb;
+ if (!binlog_meta_entry_pb.ParseFromArray(value.data(), value.size())) {
+ auto err_msg = fmt::format("fail to parse binlog meta value:{}",
value);
+ status = Status::InternalError(err_msg);
+ LOG(WARNING) << err_msg;
+ return false;
+ }
+
+ const auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2();
+ auto* binlog_meta_pb = metas_pb->add_rowset_binlog_metas();
+ binlog_meta_pb->set_rowset_id(rowset_id);
+ binlog_meta_pb->set_version(binlog_meta_entry_pb.version());
+ binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments());
+ binlog_meta_pb->set_meta_key(std::string {key});
+ binlog_meta_pb->set_meta(std::string {value});
+
+ auto binlog_data_key =
+ make_binlog_data_key(tablet_uid_str,
binlog_meta_entry_pb.version(), rowset_id);
+ std::string binlog_data;
+ status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key,
&binlog_data);
+ if (!status.ok()) {
+ LOG(WARNING) << status.to_string();
+ return false;
+ }
+ binlog_meta_pb->set_data_key(binlog_data_key);
+ binlog_meta_pb->set_data(binlog_data);
+
+ return false;
+ };
+
+ Status iterStatus =
+ meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, prefix_key,
traverse_func);
+ if (!iterStatus.ok()) {
+ LOG(WARNING) << fmt::format(
+ "fail to iterate binlog meta. prefix_key:{}, version:{},
status:{}", prefix_key,
+ version.to_string(), iterStatus.to_string());
+ return iterStatus;
+ }
+ return status;
+}
+
Status RowsetMetaManager::_get_all_rowset_binlog_metas(OlapMeta* meta, const
TabletUid tablet_uid,
RowsetBinlogMetasPB*
metas_pb) {
Status status;
diff --git a/be/src/olap/rowset/rowset_meta_manager.h
b/be/src/olap/rowset/rowset_meta_manager.h
index d8cf9c37152..f5b6f077676 100644
--- a/be/src/olap/rowset/rowset_meta_manager.h
+++ b/be/src/olap/rowset/rowset_meta_manager.h
@@ -66,6 +66,9 @@ public:
static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid
tablet_uid,
const std::vector<int64_t>&
binlog_versions,
RowsetBinlogMetasPB* metas_pb);
+ // get all binlog metas of a tablet in version.
+ static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid
tablet_uid,
+ Version version,
RowsetBinlogMetasPB* metas_pb);
static Status remove_binlog(OlapMeta* meta, const std::string& suffix);
static Status ingest_binlog_metas(OlapMeta* meta, TabletUid tablet_uid,
RowsetBinlogMetasPB* metas_pb);
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 0fcc09cec49..f55980ce6c4 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -86,16 +86,33 @@ Status SnapshotManager::make_snapshot(const
TSnapshotRequest& request, string* s
return Status::Error<INVALID_ARGUMENT>("output parameter cannot be
null");
}
- TabletSharedPtr ref_tablet =
+ TabletSharedPtr target_tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(request.tablet_id);
- DBUG_EXECUTE_IF("SnapshotManager::make_snapshot.inject_failure", {
ref_tablet = nullptr; })
+ DBUG_EXECUTE_IF("SnapshotManager::make_snapshot.inject_failure", {
target_tablet = nullptr; })
- if (ref_tablet == nullptr) {
+ if (target_tablet == nullptr) {
return Status::Error<TABLE_NOT_FOUND>("failed to get tablet.
tablet={}", request.tablet_id);
}
- res = _create_snapshot_files(ref_tablet, request, snapshot_path,
allow_incremental_clone);
+ TabletSharedPtr ref_tablet = target_tablet;
+ if (request.__isset.ref_tablet_id) {
+ int64_t ref_tablet_id = request.ref_tablet_id;
+ TabletSharedPtr base_tablet =
+
StorageEngine::instance()->tablet_manager()->get_tablet(ref_tablet_id);
+
+ // Some tasks, like medium migration, cause the target tablet and base
tablet to stay on
+ // different disks. In this case, we fall through to the normal
restore path.
+ //
+ // Otherwise, we can directly link the rowset files from the base
tablet to the target tablet.
+ if (base_tablet != nullptr &&
+ base_tablet->data_dir()->path() ==
target_tablet->data_dir()->path()) {
+ ref_tablet = std::move(base_tablet);
+ }
+ }
+
+ res = _create_snapshot_files(ref_tablet, target_tablet, request,
snapshot_path,
+ allow_incremental_clone);
if (!res.ok()) {
LOG(WARNING) << "failed to make snapshot. res=" << res << " tablet="
<< request.tablet_id;
@@ -378,6 +395,7 @@ Status check_version_continuity(const
std::vector<RowsetSharedPtr>& rowsets) {
}
Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr&
ref_tablet,
+ const TabletSharedPtr&
target_tablet,
const TSnapshotRequest& request,
string* snapshot_path,
bool* allow_incremental_clone) {
@@ -397,10 +415,10 @@ Status SnapshotManager::_create_snapshot_files(const
TabletSharedPtr& ref_tablet
timeout_s = request.timeout;
}
std::string snapshot_id_path;
- res = _calc_snapshot_id_path(ref_tablet, timeout_s, &snapshot_id_path);
+ res = _calc_snapshot_id_path(target_tablet, timeout_s, &snapshot_id_path);
if (!res.ok()) {
- LOG(WARNING) << "failed to calc snapshot_id_path, ref tablet="
- << ref_tablet->data_dir()->path();
+ LOG(WARNING) << "failed to calc snapshot_id_path, tablet="
+ << target_tablet->data_dir()->path();
return res;
}
@@ -408,12 +426,12 @@ Status SnapshotManager::_create_snapshot_files(const
TabletSharedPtr& ref_tablet
// schema_full_path_desc.filepath:
// /snapshot_id_path/tablet_id/schema_hash/
- auto schema_full_path = get_schema_hash_full_path(ref_tablet,
snapshot_id_path);
+ auto schema_full_path = get_schema_hash_full_path(target_tablet,
snapshot_id_path);
// header_path:
// /schema_full_path/tablet_id.hdr
- auto header_path = _get_header_full_path(ref_tablet, schema_full_path);
+ auto header_path = _get_header_full_path(target_tablet, schema_full_path);
// /schema_full_path/tablet_id.hdr.json
- auto json_header_path = _get_json_header_full_path(ref_tablet,
schema_full_path);
+ auto json_header_path = _get_json_header_full_path(target_tablet,
schema_full_path);
bool exists = true;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(schema_full_path,
&exists));
if (exists) {
@@ -595,7 +613,9 @@ Status SnapshotManager::_create_snapshot_files(const
TabletSharedPtr& ref_tablet
<< rs->rowset_meta()->empty();
}
if (!res.ok()) {
- LOG(WARNING) << "fail to create hard link. [path=" <<
snapshot_id_path << "]";
+ LOG(WARNING) << "fail to create hard link. path=" <<
snapshot_id_path
+ << " tablet=" << target_tablet->tablet_id()
+ << " ref tablet=" << ref_tablet->tablet_id();
break;
}
diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h
index 23b38dc302c..78b9db8659b 100644
--- a/be/src/olap/snapshot_manager.h
+++ b/be/src/olap/snapshot_manager.h
@@ -76,6 +76,7 @@ private:
const std::vector<RowsetSharedPtr>&
consistent_rowsets);
Status _create_snapshot_files(const TabletSharedPtr& ref_tablet,
+ const TabletSharedPtr& target_tablet,
const TSnapshotRequest& request,
std::string* snapshot_path,
bool* allow_incremental_clone);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index b2d5f9c114d..7c85e8238f8 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3999,6 +3999,11 @@ Status Tablet::get_rowset_binlog_metas(const
std::vector<int64_t>& binlog_versio
binlog_versions,
metas_pb);
}
+Status Tablet::get_rowset_binlog_metas(Version binlog_versions,
RowsetBinlogMetasPB* metas_pb) {
+ return RowsetMetaManager::get_rowset_binlog_metas(_data_dir->get_meta(),
tablet_uid(),
+ binlog_versions,
metas_pb);
+}
+
std::string Tablet::get_segment_filepath(std::string_view rowset_id,
std::string_view segment_index) const
{
return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id,
segment_index);
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 54d841b6bdd..5ca2248c5b5 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -548,6 +548,7 @@ public:
std::string_view rowset_id) const;
Status get_rowset_binlog_metas(const std::vector<int64_t>& binlog_versions,
RowsetBinlogMetasPB* metas_pb);
+ Status get_rowset_binlog_metas(Version binlog_versions,
RowsetBinlogMetasPB* metas_pb);
std::string get_segment_filepath(std::string_view rowset_id,
std::string_view segment_index) const;
std::string get_segment_filepath(std::string_view rowset_id, int64_t
segment_index) const;
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 573ec920160..981d8de2de5 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -276,9 +276,12 @@ Status TabletManager::create_tablet(const
TCreateTabletReq& request, std::vector
// we need use write lock on shard-1 and then use read lock on shard-2
// if there have create rollup tablet C(assume on shard-2) from tablet
D(assume on shard-1) at the same time, we will meet deadlock
std::unique_lock two_tablet_lock(_two_tablet_mtx, std::defer_lock);
- bool is_schema_change = request.__isset.base_tablet_id &&
request.base_tablet_id > 0;
- bool need_two_lock = is_schema_change && ((_tablets_shards_mask &
request.base_tablet_id) !=
- (_tablets_shards_mask &
tablet_id));
+ bool in_restore_mode = request.__isset.in_restore_mode &&
request.in_restore_mode;
+ bool is_schema_change_or_atomic_restore =
+ request.__isset.base_tablet_id && request.base_tablet_id > 0;
+ bool need_two_lock =
+ is_schema_change_or_atomic_restore &&
+ ((_tablets_shards_mask & request.base_tablet_id) !=
(_tablets_shards_mask & tablet_id));
if (need_two_lock) {
SCOPED_TIMER(ADD_TIMER(profile, "GetTwoTableLock"));
two_tablet_lock.lock();
@@ -307,7 +310,7 @@ Status TabletManager::create_tablet(const TCreateTabletReq&
request, std::vector
TabletSharedPtr base_tablet = nullptr;
// If the CreateTabletReq has base_tablet_id then it is a alter-tablet
request
- if (is_schema_change) {
+ if (is_schema_change_or_atomic_restore) {
// if base_tablet_id's lock diffrent with new_tablet_id, we need lock
it.
if (need_two_lock) {
SCOPED_TIMER(ADD_TIMER(profile, "GetBaseTablet"));
@@ -320,22 +323,28 @@ Status TabletManager::create_tablet(const
TCreateTabletReq& request, std::vector
if (base_tablet == nullptr) {
DorisMetrics::instance()->create_tablet_requests_failed->increment(1);
return Status::Error<TABLE_CREATE_META_ERROR>(
- "fail to create tablet(change schema), base tablet does
not exist. "
- "new_tablet_id={}, base_tablet_id={}",
+ "fail to create tablet(change schema/atomic restore), base
tablet does not "
+ "exist. new_tablet_id={}, base_tablet_id={}",
tablet_id, request.base_tablet_id);
}
- // If we are doing schema-change, we should use the same data dir
+ // If we are doing schema-change or atomic-restore, we should use the
same data dir
// TODO(lingbin): A litter trick here, the directory should be
determined before
// entering this method
- if (request.storage_medium ==
base_tablet->data_dir()->storage_medium()) {
+ //
+ // ATTN: Since all restored replicas will be saved to HDD, so no
storage_medium check here.
+ if (in_restore_mode ||
+ request.storage_medium ==
base_tablet->data_dir()->storage_medium()) {
+ LOG(INFO) << "create tablet use the base tablet data dir.
tablet_id=" << tablet_id
+ << ", base tablet_id=" << request.base_tablet_id
+ << ", data dir=" << base_tablet->data_dir()->path();
stores.clear();
stores.push_back(base_tablet->data_dir());
}
}
// set alter type to schema-change. it is useless
- TabletSharedPtr tablet = _internal_create_tablet_unlocked(request,
is_schema_change,
-
base_tablet.get(), stores, profile);
+ TabletSharedPtr tablet = _internal_create_tablet_unlocked(
+ request, is_schema_change_or_atomic_restore, base_tablet.get(),
stores, profile);
if (tablet == nullptr) {
DorisMetrics::instance()->create_tablet_requests_failed->increment(1);
return Status::Error<CE_CMD_PARAMS_ERROR>("fail to create tablet.
tablet_id={}",
@@ -947,6 +956,7 @@ Status TabletManager::load_tablet_from_dir(DataDir* store,
TTabletId tablet_id,
if (binlog_meta_filesize > 0) {
contain_binlog = true;
RETURN_IF_ERROR(read_pb(binlog_metas_file,
&rowset_binlog_metas_pb));
+ VLOG_DEBUG << "load rowset binlog metas from file. file_path=" <<
binlog_metas_file;
}
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(binlog_metas_file));
}
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp
b/be/src/olap/task/engine_storage_migration_task.cpp
index 239f99bb40b..218922069c7 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -37,6 +37,7 @@
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
+#include "olap/pb_helper.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/snapshot_manager.h"
#include "olap/storage_engine.h"
@@ -258,9 +259,11 @@ Status EngineStorageMigrationTask::_migrate() {
}
std::vector<RowsetSharedPtr> temp_consistent_rowsets(consistent_rowsets);
+ RowsetBinlogMetasPB rowset_binlog_metas_pb;
do {
// migrate all index and data files but header file
- res = _copy_index_and_data_files(full_path, temp_consistent_rowsets);
+ res = _copy_index_and_data_files(full_path, temp_consistent_rowsets,
+ &rowset_binlog_metas_pb);
if (!res.ok()) {
break;
}
@@ -288,7 +291,8 @@ Status EngineStorageMigrationTask::_migrate() {
// we take the lock to complete it to avoid long-term competition
with other tasks
if (_is_rowsets_size_less_than_threshold(temp_consistent_rowsets))
{
// force to copy the remaining data and index
- res = _copy_index_and_data_files(full_path,
temp_consistent_rowsets);
+ res = _copy_index_and_data_files(full_path,
temp_consistent_rowsets,
+ &rowset_binlog_metas_pb);
if (!res.ok()) {
break;
}
@@ -303,6 +307,16 @@ Status EngineStorageMigrationTask::_migrate() {
}
}
+ // save rowset binlog metas
+ if (rowset_binlog_metas_pb.rowset_binlog_metas_size() > 0) {
+ auto rowset_binlog_metas_pb_filename =
+ fmt::format("{}/rowset_binlog_metas.pb", full_path);
+ res = write_pb(rowset_binlog_metas_pb_filename,
rowset_binlog_metas_pb);
+ if (!res.ok()) {
+ break;
+ }
+ }
+
// generate new tablet meta and write to hdr file
res = _gen_and_write_header_to_hdr_file(shard, full_path,
consistent_rowsets, end_version);
if (!res.ok()) {
@@ -346,10 +360,73 @@ void EngineStorageMigrationTask::_generate_new_header(
}
Status EngineStorageMigrationTask::_copy_index_and_data_files(
- const string& full_path, const std::vector<RowsetSharedPtr>&
consistent_rowsets) const {
+ const string& full_path, const std::vector<RowsetSharedPtr>&
consistent_rowsets,
+ RowsetBinlogMetasPB* all_binlog_metas_pb) const {
+ RowsetBinlogMetasPB rowset_binlog_metas_pb;
for (const auto& rs : consistent_rowsets) {
RETURN_IF_ERROR(rs->copy_files_to(full_path, rs->rowset_id()));
+
+ Version binlog_versions = rs->version();
+ RETURN_IF_ERROR(_tablet->get_rowset_binlog_metas(binlog_versions,
&rowset_binlog_metas_pb));
}
+
+ // copy index binlog files.
+ for (const auto& rowset_binlog_meta :
rowset_binlog_metas_pb.rowset_binlog_metas()) {
+ auto num_segments = rowset_binlog_meta.num_segments();
+ std::string_view rowset_id = rowset_binlog_meta.rowset_id();
+
+ RowsetMetaPB rowset_meta_pb;
+ if (!rowset_meta_pb.ParseFromString(rowset_binlog_meta.data())) {
+ auto err_msg = fmt::format("fail to parse binlog meta data
value:{}",
+ rowset_binlog_meta.data());
+ LOG(WARNING) << err_msg;
+ return Status::InternalError(err_msg);
+ }
+ const auto& tablet_schema_pb = rowset_meta_pb.tablet_schema();
+ TabletSchema tablet_schema;
+ tablet_schema.init_from_pb(tablet_schema_pb);
+
+ // copy segment files and index files
+ for (int64_t segment_index = 0; segment_index < num_segments;
++segment_index) {
+ std::string segment_file_path =
_tablet->get_segment_filepath(rowset_id, segment_index);
+ auto snapshot_segment_file_path =
+ fmt::format("{}/{}_{}.binlog", full_path, rowset_id,
segment_index);
+
+ Status status =
io::global_local_filesystem()->copy_path(segment_file_path,
+
snapshot_segment_file_path);
+ if (!status.ok()) {
+ LOG(WARNING) << "fail to copy binlog segment file. [src=" <<
segment_file_path
+ << ", dest=" << snapshot_segment_file_path << "]"
<< status;
+ return status;
+ }
+ VLOG_DEBUG << "copy " << segment_file_path << " to " <<
snapshot_segment_file_path;
+
+ for (const auto& index : tablet_schema.indexes()) {
+ if (index.index_type() != IndexType::INVERTED) {
+ continue;
+ }
+ auto index_id = index.index_id();
+ auto index_file =
+ _tablet->get_segment_index_filepath(rowset_id,
segment_index, index_id);
+ auto snapshot_segment_index_file_path = fmt::format(
+ "{}/{}_{}_{}.binlog-index", full_path, rowset_id,
segment_index, index_id);
+ VLOG_DEBUG << "copy " << index_file << " to " <<
snapshot_segment_index_file_path;
+ status = io::global_local_filesystem()->copy_path(index_file,
+
snapshot_segment_index_file_path);
+ if (!status.ok()) {
+ LOG(WARNING) << "fail to copy binlog index file. [src=" <<
index_file
+ << ", dest=" <<
snapshot_segment_index_file_path << "]" << status;
+ return status;
+ }
+ }
+ }
+ }
+
+ std::move(rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->begin(),
+ rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->end(),
+ google::protobuf::RepeatedFieldBackInserter(
+ all_binlog_metas_pb->mutable_rowset_binlog_metas()));
+
return Status::OK();
}
diff --git a/be/src/olap/task/engine_storage_migration_task.h
b/be/src/olap/task/engine_storage_migration_task.h
index e4fc89107c1..f25fecbd178 100644
--- a/be/src/olap/task/engine_storage_migration_task.h
+++ b/be/src/olap/task/engine_storage_migration_task.h
@@ -20,6 +20,8 @@
#include <stdint.h>
+#include <gen_cpp/olap_file.pb.h>
+
#include <mutex>
#include <shared_mutex>
#include <string>
@@ -72,7 +74,8 @@ private:
// TODO: hkp
// rewrite this function
Status _copy_index_and_data_files(const std::string& full_path,
- const std::vector<RowsetSharedPtr>&
consistent_rowsets) const;
+ const std::vector<RowsetSharedPtr>&
consistent_rowsets,
+ RowsetBinlogMetasPB*
all_binlog_metas_pb) const;
private:
// tablet to do migrated
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
index fe66f0ee4cb..9585a2e5069 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
@@ -43,6 +43,7 @@ public class RestoreStmt extends AbstractBackupStmt {
public static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE =
"reserve_dynamic_partition_enable";
public static final String PROP_CLEAN_TABLES = "clean_tables";
public static final String PROP_CLEAN_PARTITIONS = "clean_partitions";
+ public static final String PROP_ATOMIC_RESTORE = "atomic_restore";
private boolean allowLoad = false;
private ReplicaAllocation replicaAlloc =
ReplicaAllocation.DEFAULT_ALLOCATION;
@@ -54,6 +55,7 @@ public class RestoreStmt extends AbstractBackupStmt {
private boolean isBeingSynced = false;
private boolean isCleanTables = false;
private boolean isCleanPartitions = false;
+ private boolean isAtomicRestore = false;
private byte[] meta = null;
private byte[] jobInfo = null;
@@ -121,6 +123,10 @@ public class RestoreStmt extends AbstractBackupStmt {
return isCleanPartitions;
}
+ public boolean isAtomicRestore() {
+ return isAtomicRestore;
+ }
+
@Override
public void analyze(Analyzer analyzer) throws UserException {
if (repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) {
@@ -199,6 +205,9 @@ public class RestoreStmt extends AbstractBackupStmt {
// is clean partitions
isCleanPartitions = eatBooleanProperty(copiedProperties,
PROP_CLEAN_PARTITIONS, isCleanPartitions);
+ // is atomic restore
+ isAtomicRestore = eatBooleanProperty(copiedProperties,
PROP_ATOMIC_RESTORE, isAtomicRestore);
+
if (!copiedProperties.isEmpty()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
"Unknown restore job properties: " +
copiedProperties.keySet());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index 144b4e49360..136a13e0005 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -457,12 +457,14 @@ public class BackupHandler extends MasterDaemon
implements Writable {
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(),
stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(),
stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(),
- stmt.isCleanTables(), stmt.isCleanPartitions(), env,
Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
+ stmt.isCleanTables(), stmt.isCleanPartitions(),
stmt.isAtomicRestore(),
+ env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
} else {
restoreJob = new RestoreJob(stmt.getLabel(),
stmt.getBackupTimestamp(),
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(),
stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), stmt.getMetaVersion(),
stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(),
- stmt.isBeingSynced(), stmt.isCleanTables(),
stmt.isCleanPartitions(), env, repository.getId());
+ stmt.isBeingSynced(), stmt.isCleanTables(),
stmt.isCleanPartitions(), stmt.isAtomicRestore(),
+ env, repository.getId());
}
env.getEditLog().logRestoreJob(restoreJob);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreFileMapping.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreFileMapping.java
index 07ddf6844dc..f712afbb271 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreFileMapping.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreFileMapping.java
@@ -39,7 +39,7 @@ public class RestoreFileMapping implements Writable {
}
public IdChain(Long... ids) {
- Preconditions.checkState(ids.length == 5);
+ Preconditions.checkState(ids.length == 6);
chain = ids;
}
@@ -63,6 +63,14 @@ public class RestoreFileMapping implements Writable {
return chain[4];
}
+ public boolean hasRefTabletId() {
+ return chain.length >= 6 && chain[5] != -1L;
+ }
+
+ public long getRefTabletId() {
+ return chain[5];
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -78,8 +86,12 @@ public class RestoreFileMapping implements Writable {
return false;
}
+ if (((IdChain) obj).chain.length != chain.length) {
+ return false;
+ }
+
IdChain other = (IdChain) obj;
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < chain.length; i++) {
// DO NOT use ==, Long_1 != Long_2
if (!chain[i].equals(other.chain[i])) {
return false;
@@ -92,7 +104,7 @@ public class RestoreFileMapping implements Writable {
@Override
public int hashCode() {
int code = chain[0].hashCode();
- for (int i = 1; i < 5; i++) {
+ for (int i = 1; i < chain.length; i++) {
code ^= chain[i].hashCode();
}
return code;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 738c249d583..56921b6d02c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -61,6 +61,7 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DbUtil;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.TimeUtils;
@@ -114,6 +115,8 @@ public class RestoreJob extends AbstractJob {
private static final String PROP_IS_BEING_SYNCED =
PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED;
private static final String PROP_CLEAN_TABLES =
RestoreStmt.PROP_CLEAN_TABLES;
private static final String PROP_CLEAN_PARTITIONS =
RestoreStmt.PROP_CLEAN_PARTITIONS;
+ private static final String PROP_ATOMIC_RESTORE =
RestoreStmt.PROP_ATOMIC_RESTORE;
+ private static final String ATOMIC_RESTORE_TABLE_PREFIX =
"__doris_atomic_restore_prefix__";
private static final Logger LOG = LogManager.getLogger(RestoreJob.class);
@@ -182,6 +185,8 @@ public class RestoreJob extends AbstractJob {
private boolean isCleanTables = false;
// Whether to delete existing partitions that are not involved in the
restore.
private boolean isCleanPartitions = false;
+ // Whether to restore the data into a temp table, and then replace the
origin one.
+ private boolean isAtomicRestore = false;
// restore properties
private Map<String, String> properties = Maps.newHashMap();
@@ -193,7 +198,7 @@ public class RestoreJob extends AbstractJob {
public RestoreJob(String label, String backupTs, long dbId, String dbName,
BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion,
boolean reserveReplica,
boolean reserveDynamicPartitionEnable, boolean isBeingSynced,
boolean isCleanTables,
- boolean isCleanPartitions, Env env, long repoId) {
+ boolean isCleanPartitions, boolean isAtomicRestore, Env env, long
repoId) {
super(JobType.RESTORE, label, dbId, dbName, timeoutMs, env, repoId);
this.backupTimestamp = backupTs;
this.jobInfo = jobInfo;
@@ -210,19 +215,22 @@ public class RestoreJob extends AbstractJob {
this.isBeingSynced = isBeingSynced;
this.isCleanTables = isCleanTables;
this.isCleanPartitions = isCleanPartitions;
+ this.isAtomicRestore = isAtomicRestore;
properties.put(PROP_RESERVE_REPLICA, String.valueOf(reserveReplica));
properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE,
String.valueOf(reserveDynamicPartitionEnable));
properties.put(PROP_IS_BEING_SYNCED, String.valueOf(isBeingSynced));
properties.put(PROP_CLEAN_TABLES, String.valueOf(isCleanTables));
properties.put(PROP_CLEAN_PARTITIONS,
String.valueOf(isCleanPartitions));
+ properties.put(PROP_ATOMIC_RESTORE, String.valueOf(isAtomicRestore));
}
public RestoreJob(String label, String backupTs, long dbId, String dbName,
BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion,
boolean reserveReplica,
boolean reserveDynamicPartitionEnable, boolean isBeingSynced,
boolean isCleanTables,
- boolean isCleanPartitions, Env env, long repoId, BackupMeta
backupMeta) {
+ boolean isCleanPartitions, boolean isAtomicRestore, Env env, long
repoId, BackupMeta backupMeta) {
this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc,
timeoutMs, metaVersion, reserveReplica,
- reserveDynamicPartitionEnable, isBeingSynced, isCleanTables,
isCleanPartitions, env, repoId);
+ reserveDynamicPartitionEnable, isBeingSynced, isCleanTables,
isCleanPartitions, isAtomicRestore, env,
+ repoId);
this.backupMeta = backupMeta;
}
@@ -387,6 +395,12 @@ public class RestoreJob extends AbstractJob {
checkIfNeedCancel();
if (status.ok()) {
+ if (state != RestoreJobState.PENDING && label.equals(
+
DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_NON_PENDING_RESTORE_JOB", "")))
{
+ LOG.info("pause restore job by debug point: {}", this);
+ return;
+ }
+
switch (state) {
case PENDING:
checkAndPrepareMeta();
@@ -494,8 +508,10 @@ public class RestoreJob extends AbstractJob {
}
Preconditions.checkNotNull(backupMeta);
- // Set all restored tbls' state to RESTORE
- // Table's origin state must be NORMAL and does not have unfinished
load job.
+ // Check the olap table state.
+ //
+ // If isAtomicRestore is not set, set all restored tbls' state to
RESTORE,
+ // the table's origin state must be NORMAL and does not have
unfinished load job.
for (String tableName : jobInfo.backupOlapTableObjects.keySet()) {
Table tbl =
db.getTableNullable(jobInfo.getAliasByOriginNameIfSet(tableName));
if (tbl == null) {
@@ -523,6 +539,13 @@ public class RestoreJob extends AbstractJob {
return;
}
+ if (isAtomicRestore) {
+ // We will create new OlapTable in atomic restore, so does
not set the RESTORE state.
+ // Instead, set table in atomic restore state, to forbid
the alter table operation.
+ olapTbl.setInAtomicRestore();
+ continue;
+ }
+
for (Partition partition : olapTbl.getPartitions()) {
if
(!env.getLoadInstance().checkPartitionLoadFinished(partition.getId(), null)) {
status = new Status(ErrCode.COMMON_ERROR,
@@ -584,6 +607,9 @@ public class RestoreJob extends AbstractJob {
}
}
+ // the new tablets -> { local tablet, schema hash, storage medium },
used in atomic restore.
+ Map<Long, TabletRef> tabletBases = new HashMap<>();
+
// Check and prepare meta objects.
AgentBatchTask batchTask = new AgentBatchTask();
db.readLock();
@@ -594,14 +620,15 @@ public class RestoreJob extends AbstractJob {
Table remoteTbl = backupMeta.getTable(tableName);
Preconditions.checkNotNull(remoteTbl);
Table localTbl =
db.getTableNullable(jobInfo.getAliasByOriginNameIfSet(tableName));
+ if (localTbl != null && localTbl.getType() != TableType.OLAP) {
+ // table already exist, but is not OLAP
+ status = new Status(ErrCode.COMMON_ERROR,
+ "The type of local table should be same as type of
remote table: "
+ + remoteTbl.getName());
+ return;
+ }
+
if (localTbl != null) {
- // table already exist, check schema
- if (localTbl.getType() != TableType.OLAP) {
- status = new Status(ErrCode.COMMON_ERROR,
- "The type of local table should be same as
type of remote table: "
- + remoteTbl.getName());
- return;
- }
OlapTable localOlapTbl = (OlapTable) localTbl;
OlapTable remoteOlapTbl = (OlapTable) remoteTbl;
@@ -647,28 +674,26 @@ public class RestoreJob extends AbstractJob {
PartitionItem localItem =
localPartInfo.getItem(localPartition.getId());
PartitionItem remoteItem = remoteOlapTbl
.getPartitionInfo().getItem(backupPartInfo.id);
- if (localItem.equals(remoteItem)) {
- // Same partition, same range
- if
(genFileMappingWhenBackupReplicasEqual(localPartInfo, localPartition,
- localTbl, backupPartInfo,
partitionName, tblInfo, remoteReplicaAlloc)) {
- return;
- }
- } else {
+ if (!localItem.equals(remoteItem)) {
// Same partition name, different range
status = new
Status(ErrCode.COMMON_ERROR, "Partition " + partitionName
+ " in table " +
localTbl.getName()
+ " has different partition
item with partition in repository");
return;
}
- } else {
- // If this is a single partitioned table.
- if
(genFileMappingWhenBackupReplicasEqual(localPartInfo, localPartition, localTbl,
- backupPartInfo, partitionName,
tblInfo, remoteReplicaAlloc)) {
- return;
- }
}
- } else {
+ if (isAtomicRestore) {
+ // skip gen file mapping for atomic
restore.
+ continue;
+ }
+
+ // Same partition, same range or a single
partitioned table.
+ if
(genFileMappingWhenBackupReplicasEqual(localPartInfo, localPartition,
+ localTbl, backupPartInfo,
partitionName, tblInfo, remoteReplicaAlloc)) {
+ return;
+ }
+ } else if (!isAtomicRestore) {
// partitions does not exist
PartitionInfo localPartitionInfo =
localOlapTbl.getPartitionInfo();
if (localPartitionInfo.getType() ==
PartitionType.RANGE
@@ -709,8 +734,10 @@ public class RestoreJob extends AbstractJob {
} finally {
localOlapTbl.readUnlock();
}
- } else {
- // Table does not exist
+ }
+
+ // Table does not exist or atomic restore
+ if (localTbl == null || isAtomicRestore) {
OlapTable remoteOlapTbl = (OlapTable) remoteTbl;
// Retain only expected restore partitions in this table;
Set<String> allPartNames =
remoteOlapTbl.getPartitionNames();
@@ -738,7 +765,18 @@ public class RestoreJob extends AbstractJob {
// DO NOT set remote table's new name here, cause we will
still need the origin name later
//
remoteOlapTbl.setName(jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
remoteOlapTbl.setState(allowLoad ?
OlapTableState.RESTORE_WITH_LOAD : OlapTableState.RESTORE);
- LOG.debug("put remote table {} to restoredTbls",
remoteOlapTbl.getName());
+
+ if (isAtomicRestore && localTbl != null) {
+ // bind the backends and base tablets from local tbl.
+ status =
bindLocalAndRemoteOlapTableReplicas((OlapTable) localTbl, remoteOlapTbl,
tabletBases);
+ if (!status.ok()) {
+ return;
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("put remote table {} to restoredTbls",
remoteOlapTbl.getName());
+ }
restoredTbls.add(remoteOlapTbl);
}
} // end of all restore olap tables
@@ -797,6 +835,9 @@ public class RestoreJob extends AbstractJob {
// for now, nothing is modified in catalog
// generate create replica tasks for all restored partitions
+ if (isAtomicRestore && !restoredPartitions.isEmpty()) {
+ throw new RuntimeException("atomic restore is set, but the
restored partitions is not empty");
+ }
for (Pair<String, Partition> entry : restoredPartitions) {
OlapTable localTbl = (OlapTable)
db.getTableNullable(entry.first);
Preconditions.checkNotNull(localTbl, localTbl.getName());
@@ -816,11 +857,12 @@ public class RestoreJob extends AbstractJob {
if (restoreTbl.getType() == TableType.OLAP) {
OlapTable restoreOlapTable = (OlapTable) restoreTbl;
for (Partition restorePart :
restoreOlapTable.getPartitions()) {
- createReplicas(db, batchTask, restoreOlapTable,
restorePart);
+ createReplicas(db, batchTask, restoreOlapTable,
restorePart, tabletBases);
BackupOlapTableInfo backupOlapTableInfo =
jobInfo.getOlapTableInfo(restoreOlapTable.getName());
genFileMapping(restoreOlapTable, restorePart,
backupOlapTableInfo.id,
backupOlapTableInfo.getPartInfo(restorePart.getName()),
- !allowLoad /* if allow load, do not overwrite
when commit */);
+ !allowLoad /* if allow load, do not overwrite
when commit */,
+ tabletBases);
}
}
// set restored table's new name after all 'genFileMapping'
@@ -828,6 +870,9 @@ public class RestoreJob extends AbstractJob {
if (Env.isStoredTableNamesLowerCase()) {
tableName = tableName.toLowerCase();
}
+ if (restoreTbl.getType() == TableType.OLAP && isAtomicRestore)
{
+ tableName = tableAliasWithAtomicRestore(tableName);
+ }
restoreTbl.setName(tableName);
}
@@ -947,6 +992,90 @@ public class RestoreJob extends AbstractJob {
// No log here, PENDING state restore job will redo this method
}
+ private Status bindLocalAndRemoteOlapTableReplicas(
+ OlapTable localOlapTbl, OlapTable remoteOlapTbl,
+ Map<Long, TabletRef> tabletBases) {
+ localOlapTbl.readLock();
+ try {
+ // The storage medium of the remote olap table's storage is HDD,
because we want to
+ // restore the tables in another cluster might without SSD.
+ //
+ // Keep the storage medium of the new olap table the same as the
old one, so that
+ // the replicas in the new olap table will not be migrated to
other storage mediums.
+ remoteOlapTbl.setStorageMedium(localOlapTbl.getStorageMedium());
+ for (Partition partition : remoteOlapTbl.getPartitions()) {
+ Partition localPartition =
localOlapTbl.getPartition(partition.getName());
+ if (localPartition == null) {
+ continue;
+ }
+ // Since the replicas are bound to the same disk, the storage
medium must be the same
+ // to avoid media migration.
+ TStorageMedium storageMedium = localOlapTbl.getPartitionInfo()
+
.getDataProperty(localPartition.getId()).getStorageMedium();
+
remoteOlapTbl.getPartitionInfo().getDataProperty(partition.getId())
+ .setStorageMedium(storageMedium);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("bind local partition {} and remote partition {}
with same storage medium {}, name: {}",
+ localPartition.getId(), partition.getId(),
storageMedium, partition.getName());
+ }
+ for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+ String indexName =
remoteOlapTbl.getIndexNameById(index.getId());
+ Long localIndexId =
localOlapTbl.getIndexIdByName(indexName);
+ MaterializedIndex localIndex = localIndexId == null ? null
: localPartition.getIndex(localIndexId);
+ if (localIndex == null) {
+ continue;
+ }
+ int schemaHash =
localOlapTbl.getSchemaHashByIndexId(localIndexId);
+ if (schemaHash == -1) {
+ return new Status(ErrCode.COMMON_ERROR, String.format(
+ "schema hash of local index %d is not found,
remote table=%d, remote index=%d, "
+ + "local table=%d, local index=%d",
localIndexId, remoteOlapTbl.getId(), index.getId(),
+ localOlapTbl.getId(), localIndexId));
+ }
+
+ List<Tablet> localTablets = localIndex.getTablets();
+ List<Tablet> remoteTablets = index.getTablets();
+ if (localTablets.size() != remoteTablets.size()) {
+ return new Status(ErrCode.COMMON_ERROR, String.format(
+ "the size of local tablet %s is not equals to
the remote %s, "
+ + "is_atomic_restore=true, remote table=%d,
remote index=%d, "
+ + "local table=%d, local index=%d",
localTablets.size(), remoteTablets.size(),
+ remoteOlapTbl.getId(), index.getId(),
localOlapTbl.getId(), localIndexId));
+ }
+ for (int i = 0; i < remoteTablets.size(); i++) {
+ Tablet localTablet = localTablets.get(i);
+ Tablet remoteTablet = remoteTablets.get(i);
+ List<Replica> localReplicas =
localTablet.getReplicas();
+ List<Replica> remoteReplicas =
remoteTablet.getReplicas();
+ if (localReplicas.size() != remoteReplicas.size()) {
+ return new Status(ErrCode.COMMON_ERROR,
String.format(
+ "the size of local replicas %s is not
equals to the remote %s, "
+ + "is_atomic_restore=true, remote
table=%d, remote index=%d, "
+ + "local table=%d, local index=%d, local
replicas=%d, remote replicas=%d",
+ localTablets.size(), remoteTablets.size(),
remoteOlapTbl.getId(),
+ index.getId(), localOlapTbl.getId(),
localIndexId, localReplicas.size(),
+ remoteReplicas.size()));
+ }
+ for (int j = 0; j < remoteReplicas.size(); j++) {
+ long backendId =
localReplicas.get(j).getBackendId();
+ remoteReplicas.get(j).setBackendId(backendId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("bind local replica {} and remote
replica {} with same backend {}, table={}",
+ localReplicas.get(j).getId(),
remoteReplicas.get(j).getId(), backendId,
+ localOlapTbl.getName());
+ }
+ }
+ tabletBases.put(remoteTablet.getId(),
+ new TabletRef(localTablet.getId(), schemaHash,
storageMedium));
+ }
+ }
+ }
+ } finally {
+ localOlapTbl.readUnlock();
+ }
+ return Status.OK;
+ }
+
private void prepareAndSendSnapshotTaskForOlapTable(Database db) {
LOG.info("begin to make snapshot. {} when restore content is ALL",
this);
// begin to make snapshots for all replicas
@@ -958,7 +1087,8 @@ public class RestoreJob extends AbstractJob {
AgentBatchTask batchTask = new AgentBatchTask();
db.readLock();
try {
- for (IdChain idChain : fileMapping.getMapping().keySet()) {
+ for (Map.Entry<IdChain, IdChain> entry :
fileMapping.getMapping().entrySet()) {
+ IdChain idChain = entry.getKey();
OlapTable tbl = (OlapTable)
db.getTableNullable(idChain.getTblId());
tbl.readLock();
try {
@@ -967,9 +1097,15 @@ public class RestoreJob extends AbstractJob {
Tablet tablet = index.getTablet(idChain.getTabletId());
Replica replica =
tablet.getReplicaById(idChain.getReplicaId());
long signature = env.getNextId();
+ boolean isRestoreTask = true;
+ // We don't care the visible version in restore job, the
end version is used.
+ long visibleVersion = -1L;
SnapshotTask task = new SnapshotTask(null,
replica.getBackendId(), signature, jobId, db.getId(),
- tbl.getId(), part.getId(), index.getId(),
tablet.getId(), part.getVisibleVersion(),
- tbl.getSchemaHashByIndexId(index.getId()),
timeoutMs, true /* is restore task*/);
+ tbl.getId(), part.getId(), index.getId(),
tablet.getId(), visibleVersion,
+ tbl.getSchemaHashByIndexId(index.getId()),
timeoutMs, isRestoreTask);
+ if (entry.getValue().hasRefTabletId()) {
+ task.setRefTabletId(entry.getValue().getRefTabletId());
+ }
batchTask.addTask(task);
unfinishedSignatureToId.put(signature, tablet.getId());
bePathsMap.put(replica.getBackendId(),
replica.getPathHash());
@@ -1057,6 +1193,11 @@ public class RestoreJob extends AbstractJob {
}
private void createReplicas(Database db, AgentBatchTask batchTask,
OlapTable localTbl, Partition restorePart) {
+ createReplicas(db, batchTask, localTbl, restorePart, null);
+ }
+
+ private void createReplicas(Database db, AgentBatchTask batchTask,
OlapTable localTbl, Partition restorePart,
+ Map<Long, TabletRef> tabletBases) {
Set<String> bfColumns = localTbl.getCopiedBfColumns();
double bfFpp = localTbl.getBfFpp();
@@ -1071,8 +1212,12 @@ public class RestoreJob extends AbstractJob {
for (MaterializedIndex restoredIdx :
restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
MaterializedIndexMeta indexMeta =
localTbl.getIndexMetaByIndexId(restoredIdx.getId());
for (Tablet restoreTablet : restoredIdx.getTablets()) {
+ TabletRef baseTabletRef = tabletBases == null ? null :
tabletBases.get(restoreTablet.getId());
+ // All restored replicas will be saved to HDD by default.
+ TStorageMedium storageMedium = baseTabletRef == null
+ ? TStorageMedium.HDD : baseTabletRef.storageMedium;
TabletMeta tabletMeta = new TabletMeta(db.getId(),
localTbl.getId(), restorePart.getId(),
- restoredIdx.getId(), indexMeta.getSchemaHash(),
TStorageMedium.HDD);
+ restoredIdx.getId(), indexMeta.getSchemaHash(),
storageMedium);
Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(),
tabletMeta);
for (Replica restoreReplica : restoreTablet.getReplicas()) {
Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica);
@@ -1081,7 +1226,7 @@ public class RestoreJob extends AbstractJob {
restoreTablet.getId(), restoreReplica.getId(),
indexMeta.getShortKeyColumnCount(),
indexMeta.getSchemaHash(),
restoreReplica.getVersion(),
indexMeta.getKeysType(), TStorageType.COLUMN,
- TStorageMedium.HDD /* all restored replicas will
be saved to HDD */,
+ storageMedium,
indexMeta.getSchema(), bfColumns, bfFpp, null,
localTbl.getCopiedIndexes(),
localTbl.isInMemory(),
@@ -1103,6 +1248,12 @@ public class RestoreJob extends AbstractJob {
binlogConfig, objectPool);
task.setInRestoreMode(true);
+ if (baseTabletRef != null) {
+ // ensure this replica is bound to the same backend
disk as the origin table's replica.
+ task.setBaseTablet(baseTabletRef.tabletId,
baseTabletRef.schemaHash);
+ LOG.info("set base tablet {} for replica {} in restore
job {}, tablet id={}",
+ baseTabletRef.tabletId,
restoreReplica.getId(), jobId, restoreTablet.getId());
+ }
batchTask.addTask(task);
}
}
@@ -1185,6 +1336,11 @@ public class RestoreJob extends AbstractJob {
// files in repo to files in local
private void genFileMapping(OlapTable localTbl, Partition localPartition,
Long remoteTblId,
BackupPartitionInfo backupPartInfo, boolean overwrite) {
+ genFileMapping(localTbl, localPartition, remoteTblId, backupPartInfo,
overwrite, null);
+ }
+
+ private void genFileMapping(OlapTable localTbl, Partition localPartition,
Long remoteTblId,
+ BackupPartitionInfo backupPartInfo, boolean overwrite, Map<Long,
TabletRef> tabletBases) {
for (MaterializedIndex localIdx :
localPartition.getMaterializedIndices(IndexExtState.VISIBLE)) {
LOG.debug("get index id: {}, index name: {}", localIdx.getId(),
localTbl.getIndexNameById(localIdx.getId()));
@@ -1195,10 +1351,21 @@ public class RestoreJob extends AbstractJob {
BackupTabletInfo backupTabletInfo =
backupIdxInfo.sortedTabletInfoList.get(i);
LOG.debug("get tablet mapping: {} to {}, index {}",
backupTabletInfo.id, localTablet.getId(), i);
for (Replica localReplica : localTablet.getReplicas()) {
- IdChain src = new IdChain(remoteTblId, backupPartInfo.id,
backupIdxInfo.id, backupTabletInfo.id,
- -1L /* no replica id */);
- IdChain dest = new IdChain(localTbl.getId(),
localPartition.getId(),
- localIdx.getId(), localTablet.getId(),
localReplica.getId());
+ long refTabletId = -1L;
+ if (tabletBases != null &&
tabletBases.containsKey(localTablet.getId())) {
+ refTabletId =
tabletBases.get(localTablet.getId()).tabletId;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("restored tablet {} is based on exists
tablet {}",
+ localTablet.getId(), refTabletId);
+ }
+ }
+
+ long noReplicaId = -1L;
+ long noRefTabletId = -1L;
+ IdChain src = new IdChain(remoteTblId, backupPartInfo.id,
backupIdxInfo.id,
+ backupTabletInfo.id, noReplicaId, refTabletId);
+ IdChain dest = new IdChain(localTbl.getId(),
localPartition.getId(), localIdx.getId(),
+ localTablet.getId(), localReplica.getId(),
noRefTabletId);
fileMapping.putMapping(dest, src, overwrite);
}
}
@@ -1245,6 +1412,12 @@ public class RestoreJob extends AbstractJob {
OlapTable olapTbl = (OlapTable) tbl;
tbl.writeLock();
try {
+ if (isAtomicRestore) {
+ // Atomic restore will creates new replica of the
OlapTable.
+ olapTbl.setInAtomicRestore();
+ continue;
+ }
+
olapTbl.setState(OlapTableState.RESTORE);
// set restore status for partitions
BackupOlapTableInfo tblInfo =
jobInfo.backupOlapTableObjects.get(tableName);
@@ -1365,7 +1538,7 @@ public class RestoreJob extends AbstractJob {
}
private void downloadRemoteSnapshots() {
- // Categorize snapshot onfos by db id.
+ // Categorize snapshot infos by db id.
ArrayListMultimap<Long, SnapshotInfo> dbToSnapshotInfos =
ArrayListMultimap.create();
for (SnapshotInfo info : snapshotInfos.values()) {
dbToSnapshotInfos.put(info.getDbId(), info);
@@ -1460,8 +1633,9 @@ public class RestoreJob extends AbstractJob {
return;
}
+ long refTabletId = -1L; // no ref tablet id
IdChain catalogIds = new IdChain(tbl.getId(),
part.getId(), idx.getId(),
- info.getTabletId(), replica.getId());
+ info.getTabletId(), replica.getId(),
refTabletId);
IdChain repoIds = fileMapping.get(catalogIds);
if (repoIds == null) {
status = new Status(ErrCode.NOT_FOUND,
@@ -1603,8 +1777,9 @@ public class RestoreJob extends AbstractJob {
return;
}
+ long refTabletId = -1L; // no ref tablet id
IdChain catalogIds = new IdChain(tbl.getId(),
part.getId(), idx.getId(),
- info.getTabletId(), replica.getId());
+ info.getTabletId(), replica.getId(),
refTabletId);
IdChain repoIds = fileMapping.get(catalogIds);
if (repoIds == null) {
status = new Status(ErrCode.NOT_FOUND,
@@ -1746,6 +1921,14 @@ public class RestoreJob extends AbstractJob {
return new Status(ErrCode.NOT_FOUND, "database " + dbId + " does
not exist");
}
+ // replace the origin tables in atomic.
+ if (isAtomicRestore) {
+ Status st = atomicReplaceOlapTables(db, isReplay);
+ if (!st.ok()) {
+ return st;
+ }
+ }
+
// set all restored partition version and version hash
// set all tables' state to NORMAL
setTableStateToNormalAndUpdateProperties(db, true, isReplay);
@@ -1994,6 +2177,12 @@ public class RestoreJob extends AbstractJob {
// remove restored tbls
for (Table restoreTbl : restoredTbls) {
+ if (isAtomicRestore && restoreTbl.getType() == TableType.OLAP
+ &&
!restoreTbl.getName().startsWith(ATOMIC_RESTORE_TABLE_PREFIX)) {
+ // In atomic restore, a table registered to db must have a
name with the prefix,
+ // otherwise, it has not been registered and can be
ignored here.
+ continue;
+ }
LOG.info("remove restored table when cancelled: {}",
restoreTbl.getName());
if (db.writeLockIfExist()) {
try {
@@ -2070,6 +2259,86 @@ public class RestoreJob extends AbstractJob {
LOG.info("finished to cancel restore job. is replay: {}. {}",
isReplay, this);
}
+ private Status atomicReplaceOlapTables(Database db, boolean isReplay) {
+ assert isAtomicRestore;
+ for (String tableName : jobInfo.backupOlapTableObjects.keySet()) {
+ String originName = jobInfo.getAliasByOriginNameIfSet(tableName);
+ if (Env.isStoredTableNamesLowerCase()) {
+ originName = originName.toLowerCase();
+ }
+ String aliasName = tableAliasWithAtomicRestore(originName);
+
+ if (!db.writeLockIfExist()) {
+ return Status.OK;
+ }
+ try {
+ Table newTbl = db.getTableNullable(aliasName);
+ if (newTbl == null) {
+ LOG.warn("replace table from {} to {}, but the temp table
is not found", aliasName, originName);
+ return new Status(ErrCode.COMMON_ERROR, "replace table
failed, the temp table "
+ + aliasName + " is not found");
+ }
+ if (newTbl.getType() != TableType.OLAP) {
+ LOG.warn("replace table from {} to {}, but the temp table
is not OLAP, it type is {}",
+ aliasName, originName, newTbl.getType());
+ return new Status(ErrCode.COMMON_ERROR, "replace table
failed, the temp table " + aliasName
+ + " is not OLAP table, it is " + newTbl.getType());
+ }
+
+ OlapTable originOlapTbl = null;
+ Table originTbl = db.getTableNullable(originName);
+ if (originTbl != null) {
+ if (originTbl.getType() != TableType.OLAP) {
+ LOG.warn("replace table from {} to {}, but the origin
table is not OLAP, it type is {}",
+ aliasName, originName, originTbl.getType());
+ return new Status(ErrCode.COMMON_ERROR, "replace table
failed, the origin table "
+ + originName + " is not OLAP table, it is " +
originTbl.getType());
+ }
+ originOlapTbl = (OlapTable) originTbl; // save the origin
olap table, then drop it.
+ }
+
+ // replace the table.
+ OlapTable newOlapTbl = (OlapTable) newTbl;
+ newOlapTbl.writeLock();
+ try {
+ // rename new table name to origin table name and add the
new table to database.
+ db.dropTable(aliasName);
+ db.dropTable(originName);
+ newOlapTbl.checkAndSetName(originName, false);
+ db.createTable(newOlapTbl);
+
+ // set the olap table state to normal immediately for
querying
+ newOlapTbl.setState(OlapTableState.NORMAL);
+ LOG.info("atomic restore replace table {} name to {}, and
set state to normal, origin table={}",
+ newOlapTbl.getId(), originName, originOlapTbl ==
null ? -1L : originOlapTbl.getId());
+ } catch (DdlException e) {
+ LOG.warn("atomic restore replace table {} name from {} to
{}",
+ newOlapTbl.getId(), aliasName, originName, e);
+ return new Status(ErrCode.COMMON_ERROR, "replace table
from " + aliasName + " to " + originName
+ + " failed, reason=" + e.getMessage());
+ } finally {
+ newOlapTbl.writeUnlock();
+ }
+
+ if (originOlapTbl != null) {
+ // The origin table is not used anymore, need to drop all
its tablets.
+ originOlapTbl.writeLock();
+ try {
+ LOG.info("drop the origin olap table {} by atomic
restore. table={}",
+ originOlapTbl.getName(),
originOlapTbl.getId());
+ Env.getCurrentEnv().onEraseOlapTable(originOlapTbl,
isReplay);
+ } finally {
+ originOlapTbl.writeUnlock();
+ }
+ }
+ } finally {
+ db.writeUnlock();
+ }
+ }
+
+ return Status.OK;
+ }
+
private void setTableStateToNormalAndUpdateProperties(Database db, boolean
committed, boolean isReplay) {
for (String tableName : jobInfo.backupOlapTableObjects.keySet()) {
Table tbl =
db.getTableNullable(jobInfo.getAliasByOriginNameIfSet(tableName));
@@ -2090,6 +2359,10 @@ public class RestoreJob extends AbstractJob {
|| olapTbl.getState() ==
OlapTableState.RESTORE_WITH_LOAD) {
olapTbl.setState(OlapTableState.NORMAL);
}
+ if (olapTbl.isInAtomicRestore()) {
+ olapTbl.clearInAtomicRestore();
+ LOG.info("table {} set state from atomic restore to
normal", tableName);
+ }
BackupOlapTableInfo tblInfo =
jobInfo.backupOlapTableObjects.get(tableName);
for (Map.Entry<String, BackupPartitionInfo> partitionEntry :
tblInfo.partitions.entrySet()) {
@@ -2278,6 +2551,7 @@ public class RestoreJob extends AbstractJob {
isBeingSynced =
Boolean.parseBoolean(properties.get(PROP_IS_BEING_SYNCED));
isCleanTables =
Boolean.parseBoolean(properties.get(PROP_CLEAN_TABLES));
isCleanPartitions =
Boolean.parseBoolean(properties.get(PROP_CLEAN_PARTITIONS));
+ isAtomicRestore =
Boolean.parseBoolean(properties.get(PROP_ATOMIC_RESTORE));
}
@Override
@@ -2287,5 +2561,21 @@ public class RestoreJob extends AbstractJob {
sb.append(", state: ").append(state.name());
return sb.toString();
}
+
+ public static String tableAliasWithAtomicRestore(String tableName) {
+ return ATOMIC_RESTORE_TABLE_PREFIX + tableName;
+ }
+
+ private static class TabletRef {
+ public long tabletId;
+ public int schemaHash;
+ public TStorageMedium storageMedium;
+
+ TabletRef(long tabletId, int schemaHash, TStorageMedium storageMedium)
{
+ this.tabletId = tabletId;
+ this.schemaHash = schemaHash;
+ this.storageMedium = storageMedium;
+ }
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 91154b8d76c..827e4a222b2 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -78,6 +78,7 @@ import org.apache.doris.analysis.TableRenameClause;
import org.apache.doris.analysis.TruncateTableStmt;
import org.apache.doris.analysis.UninstallPluginStmt;
import org.apache.doris.backup.BackupHandler;
+import org.apache.doris.backup.RestoreJob;
import org.apache.doris.binlog.BinlogGcer;
import org.apache.doris.binlog.BinlogManager;
import org.apache.doris.blockrule.SqlBlockRuleMgr;
@@ -3308,6 +3309,10 @@ public class Env {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ENABLE_MOW_LIGHT_DELETE).append("\"
= \"");
sb.append(olapTable.getEnableMowLightDelete()).append("\"");
+ if (olapTable.isInAtomicRestore()) {
+
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_IN_ATOMIC_RESTORE).append("\"
= \"true\"");
+ }
+
sb.append("\n)");
} else if (table.getType() == TableType.MYSQL) {
MysqlTable mysqlTable = (MysqlTable) table;
@@ -4132,6 +4137,9 @@ public class Env {
if (db.getTable(newTableName).isPresent()) {
throw new DdlException("Table name[" + newTableName + "]
is already used");
}
+ if
(db.getTable(RestoreJob.tableAliasWithAtomicRestore(newTableName)).isPresent())
{
+ throw new DdlException("Table name[" + newTableName + "]
is already used (in restoring)");
+ }
if (table.getType() == TableType.OLAP) {
// olap table should also check if any rollup has same
name as "newTableName"
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 912286d63b4..f4c77b9cbb8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -1737,6 +1737,10 @@ public class OlapTable extends Table {
if (state != OlapTableState.NORMAL) {
throw new DdlException("Table[" + name + "]'s state is not NORMAL.
Do not allow doing ALTER ops");
}
+ if (tableProperty != null && tableProperty.isInAtomicRestore()) {
+ throw new DdlException("Table[" + name + "] is in atomic restore
state. "
+ + "Do not allow doing ALTER ops");
+ }
}
public boolean isStable(SystemInfoService infoService, TabletScheduler
tabletScheduler, String clusterName) {
@@ -1974,6 +1978,21 @@ public class OlapTable extends Table {
return "";
}
+ public void setInAtomicRestore() {
+ getOrCreatTableProperty().setInAtomicRestore().buildInAtomicRestore();
+ }
+
+ public void clearInAtomicRestore() {
+
getOrCreatTableProperty().clearInAtomicRestore().buildInAtomicRestore();
+ }
+
+ public boolean isInAtomicRestore() {
+ if (tableProperty != null) {
+ return tableProperty.isInAtomicRestore();
+ }
+ return false;
+ }
+
public boolean getEnableLightSchemaChange() {
if (tableProperty != null) {
return tableProperty.getUseSchemaLightChange();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index 996b4a8a0c5..2904dda1efe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -59,6 +59,7 @@ public class TableProperty implements Writable {
private DynamicPartitionProperty dynamicPartitionProperty = new
DynamicPartitionProperty(Maps.newHashMap());
private ReplicaAllocation replicaAlloc =
ReplicaAllocation.DEFAULT_ALLOCATION;
private boolean isInMemory = false;
+ private boolean isInAtomicRestore = false;
private String storagePolicy = "";
private Boolean isBeingSynced = null;
@@ -190,6 +191,26 @@ public class TableProperty implements Writable {
return this;
}
+ public TableProperty buildInAtomicRestore() {
+ isInAtomicRestore = Boolean.parseBoolean(properties.getOrDefault(
+ PropertyAnalyzer.PROPERTIES_IN_ATOMIC_RESTORE, "false"));
+ return this;
+ }
+
+ public boolean isInAtomicRestore() {
+ return isInAtomicRestore;
+ }
+
+ public TableProperty setInAtomicRestore() {
+ properties.put(PropertyAnalyzer.PROPERTIES_IN_ATOMIC_RESTORE, "true");
+ return this;
+ }
+
+ public TableProperty clearInAtomicRestore() {
+ properties.remove(PropertyAnalyzer.PROPERTIES_IN_ATOMIC_RESTORE);
+ return this;
+ }
+
public TableProperty buildEnableLightSchemaChange() {
enableLightSchemaChange = Boolean.parseBoolean(
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ENABLE_LIGHT_SCHEMA_CHANGE,
"false"));
@@ -574,7 +595,8 @@ public class TableProperty implements Writable {
.buildDisableAutoCompaction()
.buildEnableSingleReplicaCompaction()
.buildTimeSeriesCompactionEmptyRowsetsThreshold()
- .buildTimeSeriesCompactionLevelThreshold();
+ .buildTimeSeriesCompactionLevelThreshold()
+ .buildInAtomicRestore();
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) {
// get replica num from property map and create replica allocation
String repNum =
tableProperty.properties.remove(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 201001672b4..1bccf464c2f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -70,6 +70,7 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_VERSION_INFO = "version_info";
// for restore
public static final String PROPERTIES_SCHEMA_VERSION = "schema_version";
+ public static final String PROPERTIES_IN_ATOMIC_RESTORE =
"in_atomic_restore";
public static final String PROPERTIES_BF_COLUMNS = "bloom_filter_columns";
public static final String PROPERTIES_BF_FPP = "bloom_filter_fpp";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index fc9de17505a..d41d11252eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -51,6 +51,7 @@ import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.TruncateTableStmt;
import org.apache.doris.analysis.TypeDef;
+import org.apache.doris.backup.RestoreJob;
import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.BrokerTable;
import org.apache.doris.catalog.ColocateGroupSchema;
@@ -903,10 +904,16 @@ public class InternalCatalog implements
CatalogIf<Database> {
OlapTable olapTable = (OlapTable) table;
if ((olapTable.getState() != OlapTableState.NORMAL)) {
throw new DdlException("The table [" + tableName + "]'s
state is " + olapTable.getState()
- + ", cannot be dropped." + " please cancel the
operation on olap table firstly."
+ + ", cannot be dropped. please cancel the
operation on olap table firstly."
+ " If you want to forcibly drop(cannot be
recovered),"
+ " please use \"DROP table FORCE\".");
}
+ if (olapTable.isInAtomicRestore()) {
+ throw new DdlException("The table [" + tableName + "]'s
state is in atomic restore"
+ + ", cannot be dropped. please cancel the restore
operation on olap table"
+ + " firstly. If you want to forcibly drop(cannot
be recovered),"
+ + " please use \"DROP table FORCE\".");
+ }
}
dropTableInternal(db, table, stmt.isForceDrop());
@@ -1143,6 +1150,11 @@ public class InternalCatalog implements
CatalogIf<Database> {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
}
+ if
(db.getTable(RestoreJob.tableAliasWithAtomicRestore(tableName)).isPresent()) {
+ ErrorReport.reportDdlException(
+ "table[{}] is in atomic restore, please cancel the restore
operation firstly",
+ ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+ }
if (engineName.equals("olap")) {
return createOlapTable(db, stmt);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 7f81104d943..ff25e8c1934 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2917,6 +2917,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
if (request.isCleanTables()) {
properties.put(RestoreStmt.PROP_CLEAN_TABLES, "true");
}
+ if (request.isAtomicRestore()) {
+ properties.put(RestoreStmt.PROP_ATOMIC_RESTORE, "true");
+ }
AbstractBackupTableRefClause restoreTableRefClause = null;
if (request.isSetTableRefs()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SnapshotTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/SnapshotTask.java
index 71b3570f288..81177305683 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/SnapshotTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/SnapshotTask.java
@@ -29,6 +29,7 @@ public class SnapshotTask extends AgentTask {
private int schemaHash;
private long timeoutMs;
private boolean isRestoreTask;
+ private Long refTabletId;
// Set to true if this task for AdminCopyTablet.
// Otherwise, it is for Backup/Restore operation.
@@ -98,13 +99,23 @@ public class SnapshotTask extends AgentTask {
return resultSnapshotPath;
}
+ public void setRefTabletId(long refTabletId) {
+ assert refTabletId > 0;
+ this.refTabletId = refTabletId;
+ }
+
public TSnapshotRequest toThrift() {
TSnapshotRequest request = new TSnapshotRequest(tabletId, schemaHash);
- request.setVersion(version);
request.setListFiles(true);
request.setPreferredSnapshotVersion(TypesConstants.TPREFER_SNAPSHOT_REQ_VERSION);
request.setTimeout(timeoutMs / 1000);
request.setIsCopyTabletTask(isCopyTabletTask);
+ if (refTabletId != null) {
+ request.setRefTabletId(refTabletId);
+ }
+ if (version > 0L) {
+ request.setVersion(version);
+ }
return request;
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreFileMappingTest.java
b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreFileMappingTest.java
index d37a63f6d14..85de627fa44 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreFileMappingTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreFileMappingTest.java
@@ -31,14 +31,14 @@ public class RestoreFileMappingTest {
@Before
public void setUp() {
- src = new IdChain(10005L, 10006L, 10005L, 10007L, 10008L);
- dest = new IdChain(10004L, 10003L, 10004L, 10007L, -1L);
+ src = new IdChain(10005L, 10006L, 10005L, 10007L, 10008L, -1L);
+ dest = new IdChain(10004L, 10003L, 10004L, 10007L, -1L, -1L);
fileMapping.putMapping(src, dest, true);
}
@Test
public void test() {
- IdChain key = new IdChain(10005L, 10006L, 10005L, 10007L, 10008L);
+ IdChain key = new IdChain(10005L, 10006L, 10005L, 10007L, 10008L, -1L);
Assert.assertEquals(key, src);
Assert.assertEquals(src, key);
IdChain val = fileMapping.get(key);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
index 8026d47741c..7e8e55eea32 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
@@ -251,7 +251,8 @@ public class RestoreJobTest {
db.dropTable(expectedRestoreTbl.getName());
job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(),
db.getFullName(), jobInfo, false,
- new ReplicaAllocation((short) 3), 100000, -1, false, false,
false, false, false, env, repo.getId());
+ new ReplicaAllocation((short) 3), 100000, -1, false, false,
false, false, false, false,
+ env, repo.getId());
List<Table> tbls = Lists.newArrayList();
List<Resource> resources = Lists.newArrayList();
diff --git a/gensrc/thrift/AgentService.thrift
b/gensrc/thrift/AgentService.thrift
index 79d3eb158d1..9d9693c5d94 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -344,6 +344,7 @@ struct TSnapshotRequest {
11: optional Types.TVersion start_version
12: optional Types.TVersion end_version
13: optional bool is_copy_binlog
+ 14: optional Types.TTabletId ref_tablet_id
}
struct TReleaseSnapshotRequest {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 69aa37964dc..c391c802a03 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1104,6 +1104,7 @@ struct TRestoreSnapshotRequest {
12: optional binary job_info
13: optional bool clean_tables
14: optional bool clean_partitions
+ 15: optional bool atomic_restore
}
struct TRestoreSnapshotResult {
diff --git a/regression-test/data/backup_restore/test_backup_restore_atomic.out
b/regression-test/data/backup_restore/test_backup_restore_atomic.out
new file mode 100644
index 00000000000..bee7a4da44f
--- /dev/null
+++ b/regression-test/data/backup_restore/test_backup_restore_atomic.out
@@ -0,0 +1,78 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+10 10
+20 20
+30 30
+40 40
+50 50
+60 60
+70 70
+80 80
+90 90
+100 100
+
+-- !sql --
+10 10
+20 20
+
+-- !sql --
+10 10
+20 20
+30 30
+40 40
+50 50
+60 60
+70 70
+80 80
+90 90
+100 100
+
+-- !sql --
+10 10
+20 20
+30 30
+40 40
+50 50
+60 60
+70 70
+80 80
+90 90
+100 100
+
+-- !sql --
+10 10
+20 20
+30 30
+40 40
+50 50
+60 60
+70 70
+80 80
+90 90
+100 100
+
+-- !sql --
+10 10
+20 20
+30 30
+40 40
+50 50
+60 60
+70 70
+80 80
+90 90
+100 100
+
+-- !sql --
+10 20
+20 40
+30 60
+40 80
+50 100
+60 120
+70 140
+80 160
+90 180
+100 200
+200 200
+
diff --git
a/regression-test/data/backup_restore/test_backup_restore_atomic_with_view.out
b/regression-test/data/backup_restore/test_backup_restore_atomic_with_view.out
new file mode 100644
index 00000000000..cad6dbe8fd8
--- /dev/null
+++
b/regression-test/data/backup_restore/test_backup_restore_atomic_with_view.out
@@ -0,0 +1,60 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+1 1
+2 2
+3 3
+4 4
+5 5
+6 6
+7 7
+8 8
+9 9
+10 10
+
+-- !sql --
+6 6
+7 7
+8 8
+9 9
+10 10
+
+-- !sql --
+1 1
+2 2
+3 3
+4 4
+5 5
+6 6
+7 7
+8 8
+9 9
+10 10
+
+-- !sql --
+6 6
+7 7
+8 8
+9 9
+10 10
+
+-- !sql --
+1 1
+2 2
+3 3
+4 4
+5 5
+6 6
+7 7
+8 8
+9 9
+10 10
+11 11
+
+-- !sql --
+6 6
+7 7
+8 8
+9 9
+10 10
+11 11
+
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 9a14346af10..9a688a1b4d1 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -532,6 +532,16 @@ class Suite implements GroovyInterceptable {
runAction(new WaitForAction(context), actionSupplier)
}
+ void expectExceptionLike(Closure userFunction, String errorMessage = null)
{
+ try {
+ userFunction()
+ } catch (Exception e) {
+ if (!e.getMessage().contains(errorMessage)) {
+ throw e
+ }
+ }
+ }
+
String getBrokerName() {
String brokerName = context.config.otherConfigs.get("brokerName")
return brokerName
diff --git
a/regression-test/suites/backup_restore/test_backup_restore_atomic.groovy
b/regression-test/suites/backup_restore/test_backup_restore_atomic.groovy
new file mode 100644
index 00000000000..4b87340fb35
--- /dev/null
+++ b/regression-test/suites/backup_restore/test_backup_restore_atomic.groovy
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_backup_restore_atomic", "backup_restore") {
+ String suiteName = "test_backup_restore_atomic"
+ String dbName = "${suiteName}_db_1"
+ String dbName1 = "${suiteName}_db_2"
+ String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "")
+ String snapshotName = "${suiteName}_snapshot"
+ String tableNamePrefix = "${suiteName}_tables"
+
+ def syncer = getSyncer()
+ syncer.createS3Repository(repoName)
+ sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+ sql "CREATE DATABASE IF NOT EXISTS ${dbName1}"
+
+ // 1. restore to not exists table_0
+ // 2. restore partial data to table_1
+ // 3. restore less data to table_2
+ // 4. restore incremental data to table_3
+ int numTables = 4;
+ List<String> tables = []
+ for (int i = 0; i < numTables; ++i) {
+ String tableName = "${tableNamePrefix}_${i}"
+ tables.add(tableName)
+ sql "DROP TABLE IF EXISTS ${dbName}.${tableName}"
+ sql """
+ CREATE TABLE ${dbName}.${tableName} (
+ `id` LARGEINT NOT NULL,
+ `count` LARGEINT SUM DEFAULT "0"
+ )
+ AGGREGATE KEY(`id`)
+ PARTITION BY RANGE(`id`)
+ (
+ PARTITION p1 VALUES LESS THAN ("10"),
+ PARTITION p2 VALUES LESS THAN ("20"),
+ PARTITION p3 VALUES LESS THAN ("30"),
+ PARTITION p4 VALUES LESS THAN ("40"),
+ PARTITION p5 VALUES LESS THAN ("50"),
+ PARTITION p6 VALUES LESS THAN ("60"),
+ PARTITION p7 VALUES LESS THAN ("120")
+ )
+ DISTRIBUTED BY HASH(`id`) BUCKETS 2
+ PROPERTIES
+ (
+ "replication_num" = "1"
+ )
+ """
+ }
+
+ // 5. the len of table name equals to the config table_name_length_limit
+ def maxLabelLen = getFeConfig("table_name_length_limit").toInteger()
+ def maxTableName = "".padRight(maxLabelLen, "x")
+ logger.info("config table_name_length_limit = ${maxLabelLen}, table name =
${maxTableName}")
+ sql "DROP TABLE IF EXISTS ${dbName}.${maxTableName}"
+ sql """
+ CREATE TABLE ${dbName}.${maxTableName} (
+ `id` LARGEINT NOT NULL,
+ `count` LARGEINT SUM DEFAULT "0"
+ )
+ AGGREGATE KEY(`id`)
+ PARTITION BY RANGE(`id`)
+ (
+ PARTITION p1 VALUES LESS THAN ("10"),
+ PARTITION p2 VALUES LESS THAN ("20"),
+ PARTITION p3 VALUES LESS THAN ("30"),
+ PARTITION p4 VALUES LESS THAN ("40"),
+ PARTITION p5 VALUES LESS THAN ("50"),
+ PARTITION p6 VALUES LESS THAN ("60"),
+ PARTITION p7 VALUES LESS THAN ("120")
+ )
+ DISTRIBUTED BY HASH(`id`) BUCKETS 2
+ PROPERTIES
+ (
+ "replication_num" = "1"
+ )
+ """
+ tables.add(maxTableName)
+
+ int numRows = 10;
+ List<String> values = []
+ for (int j = 1; j <= numRows; ++j) {
+ values.add("(${j}0, ${j}0)")
+ }
+
+ sql "INSERT INTO ${dbName}.${tableNamePrefix}_0 VALUES ${values.join(",")}"
+ sql "INSERT INTO ${dbName}.${tableNamePrefix}_1 VALUES ${values.join(",")}"
+ sql "INSERT INTO ${dbName}.${tableNamePrefix}_2 VALUES ${values.join(",")}"
+ sql "INSERT INTO ${dbName}.${tableNamePrefix}_3 VALUES ${values.join(",")}"
+ sql "INSERT INTO ${dbName}.${maxTableName} VALUES ${values.join(",")}"
+
+ // the other partitions of table_1 will be drop
+ sql """
+ BACKUP SNAPSHOT ${dbName}.${snapshotName}
+ TO `${repoName}`
+ ON (
+ ${tableNamePrefix}_0,
+ ${tableNamePrefix}_1 PARTITION (p1, p2, p3),
+ ${tableNamePrefix}_2,
+ ${tableNamePrefix}_3,
+ ${maxTableName}
+ )
+ """
+
+ syncer.waitSnapshotFinish(dbName)
+
+ def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)
+ assertTrue(snapshot != null)
+
+ // drop table_0
+ sql "DROP TABLE ${dbName}.${tableNamePrefix}_0 FORCE"
+
+ // insert external data to table_2
+ sql "INSERT INTO ${dbName}.${tableNamePrefix}_2 VALUES ${values.join(",")}"
+
+ sql "TRUNCATE TABLE ${dbName}.${tableNamePrefix}_3"
+
+ sql """
+ RESTORE SNAPSHOT ${dbName}.${snapshotName}
+ FROM `${repoName}`
+ PROPERTIES
+ (
+ "backup_timestamp" = "${snapshot}",
+ "reserve_replica" = "true",
+ "atomic_restore" = "true"
+ )
+ """
+
+ syncer.waitAllRestoreFinish(dbName)
+
+ for (def tableName in tables) {
+ qt_sql "SELECT * FROM ${dbName}.${tableName} ORDER BY id"
+ }
+
+ // restore table_3 to new db
+ sql """
+ RESTORE SNAPSHOT ${dbName1}.${snapshotName}
+ FROM `${repoName}`
+ ON (${tableNamePrefix}_3)
+ PROPERTIES
+ (
+ "backup_timestamp" = "${snapshot}",
+ "reserve_replica" = "true",
+ "atomic_restore" = "true"
+ )
+ """
+
+ syncer.waitAllRestoreFinish(dbName1)
+
+ qt_sql "SELECT * FROM ${dbName1}.${tableNamePrefix}_3 ORDER BY id"
+
+ // add partition and insert some data.
+ sql "ALTER TABLE ${dbName}.${tableNamePrefix}_3 ADD PARTITION p8 VALUES
LESS THAN MAXVALUE"
+ sql "INSERT INTO ${dbName}.${tableNamePrefix}_3 VALUES ${values.join(",")}"
+ sql "INSERT INTO ${dbName}.${tableNamePrefix}_3 VALUES (200, 200)"
+
+ // backup again
+ snapshotName = "${snapshotName}_1"
+ sql """
+ BACKUP SNAPSHOT ${dbName}.${snapshotName}
+ TO `${repoName}`
+ ON (${tableNamePrefix}_3)
+ """
+
+ syncer.waitSnapshotFinish(dbName)
+
+ snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)
+ assertTrue(snapshot != null)
+
+ // restore with incremental data
+ sql """
+ RESTORE SNAPSHOT ${dbName1}.${snapshotName}
+ FROM `${repoName}`
+ ON (${tableNamePrefix}_3)
+ PROPERTIES
+ (
+ "backup_timestamp" = "${snapshot}",
+ "reserve_replica" = "true",
+ "atomic_restore" = "true"
+ )
+ """
+
+ syncer.waitAllRestoreFinish(dbName1)
+
+ qt_sql "SELECT * FROM ${dbName1}.${tableNamePrefix}_3 ORDER BY id"
+
+ for (def tableName in tables) {
+ sql "DROP TABLE ${dbName}.${tableName} FORCE"
+ }
+ sql "DROP DATABASE ${dbName} FORCE"
+ sql "DROP DATABASE ${dbName1} FORCE"
+ sql "DROP REPOSITORY `${repoName}`"
+}
+
+
diff --git
a/regression-test/suites/backup_restore/test_backup_restore_atomic_cancel.groovy
b/regression-test/suites/backup_restore/test_backup_restore_atomic_cancel.groovy
new file mode 100644
index 00000000000..3487c93b0d6
--- /dev/null
+++
b/regression-test/suites/backup_restore/test_backup_restore_atomic_cancel.groovy
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_backup_restore_atomic_cancel") {
+ String suiteName = "test_backup_restore_atomic_cancelled"
+ String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "")
+ String dbName = "${suiteName}_db"
+ String tableName = "${suiteName}_table"
+ String tableName1 = "${suiteName}_table_1"
+ String viewName = "${suiteName}_view"
+ String snapshotName = "${suiteName}_snapshot"
+
+ def syncer = getSyncer()
+ syncer.createS3Repository(repoName)
+
+ sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+ sql "DROP TABLE IF EXISTS ${dbName}.${tableName}"
+ sql """
+ CREATE TABLE ${dbName}.${tableName} (
+ `id` LARGEINT NOT NULL,
+ `count` LARGEINT SUM DEFAULT "0")
+ AGGREGATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 2
+ PROPERTIES
+ (
+ "replication_num" = "1"
+ )
+ """
+ sql "DROP TABLE IF EXISTS ${dbName}.${tableName1}"
+ sql """
+ CREATE TABLE ${dbName}.${tableName1} (
+ `id` LARGEINT NOT NULL,
+ `count` LARGEINT SUM DEFAULT "0")
+ AGGREGATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 2
+ PROPERTIES
+ (
+ "replication_num" = "1"
+ )
+ """
+ sql "DROP VIEW IF EXISTS ${dbName}.${viewName}"
+ sql """
+ CREATE VIEW ${dbName}.${viewName}
+ AS
+ SELECT id, count FROM ${dbName}.${tableName}
+ WHERE id > 5
+ """
+
+ List<String> values = []
+ for (int i = 1; i <= 10; ++i) {
+ values.add("(${i}, ${i})")
+ }
+ sql "INSERT INTO ${dbName}.${tableName} VALUES ${values.join(",")}"
+ def result = sql "SELECT * FROM ${dbName}.${tableName}"
+ assertEquals(result.size(), values.size());
+
+ sql "INSERT INTO ${dbName}.${tableName1} VALUES ${values.join(",")}"
+ result = sql "SELECT * FROM ${dbName}.${tableName1}"
+ assertEquals(result.size(), values.size());
+
+
+ sql """
+ BACKUP SNAPSHOT ${dbName}.${snapshotName}
+ TO `${repoName}`
+ """
+
+ syncer.waitSnapshotFinish(dbName)
+
+ // alter view and restore, it must failed because the signatures are not
matched
+
+ sql """
+ ALTER VIEW ${dbName}.${viewName}
+ AS
+ SELECT id,count FROM ${dbName}.${tableName}
+ WHERE id < 100
+
+ """
+
+ sql "INSERT INTO ${dbName}.${tableName} VALUES (11, 11)"
+
+ def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)
+ sql """
+ RESTORE SNAPSHOT ${dbName}.${snapshotName}
+ FROM `${repoName}`
+ PROPERTIES
+ (
+ "backup_timestamp" = "${snapshot}",
+ "reserve_replica" = "true",
+ "atomic_restore" = "true"
+ )
+ """
+
+ syncer.waitAllRestoreFinish(dbName)
+
+ def restore_result = sql_return_maparray """ SHOW RESTORE FROM ${dbName}
WHERE Label ="${snapshotName}" """
+ restore_result.last()
+ logger.info("show restore result: ${restore_result}")
+ assertTrue(restore_result.last().State == "CANCELLED")
+
+
+ // Do not affect any tables.
+ result = sql "SELECT * FROM ${dbName}.${tableName}"
+ assertEquals(result.size(), values.size() + 1);
+
+ result = sql "SELECT * FROM ${dbName}.${tableName1}"
+ assertEquals(result.size(), values.size());
+
+ sql "DROP TABLE ${dbName}.${tableName} FORCE"
+ sql "DROP TABLE ${dbName}.${tableName1} FORCE"
+ sql "DROP DATABASE ${dbName} FORCE"
+ sql "DROP REPOSITORY `${repoName}`"
+}
+
+
diff --git
a/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy
b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy
new file mode 100644
index 00000000000..46a3ca5b29d
--- /dev/null
+++
b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_backup_restore_atomic_with_alter", "backup_restore") {
+ if (!getFeConfig("enable_debug_points").equals("true")) {
+ logger.info("Config.enable_debug_points=true is required")
+ return
+ }
+
+ String suiteName = "test_backup_restore_atomic_with_alter"
+ String dbName = "${suiteName}_db"
+ String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "")
+ String snapshotName = "snapshot_" +
UUID.randomUUID().toString().replace("-", "")
+ String tableNamePrefix = "${suiteName}_tables"
+
+ def syncer = getSyncer()
+ syncer.createS3Repository(repoName)
+ sql "DROP DATABASE IF EXISTS ${dbName} FORCE"
+ sql "CREATE DATABASE ${dbName}"
+
+ // during restoring, if:
+ // 1. table_0 not exists, create table_0 is not allowed
+ // 2. table_1 exists, alter operation is not allowed
+ // 3. table_1 exists, drop table is not allowed
+ // 4. table_0 not exists, rename table_2 to table_0 is not allowed
+ int numTables = 3;
+ List<String> tables = []
+ for (int i = 0; i < numTables; ++i) {
+ String tableName = "${tableNamePrefix}_${i}"
+ tables.add(tableName)
+ sql "DROP TABLE IF EXISTS ${dbName}.${tableName}"
+ sql """
+ CREATE TABLE ${dbName}.${tableName} (
+ `id` LARGEINT NOT NULL,
+ `count` LARGEINT SUM DEFAULT "0"
+ )
+ AGGREGATE KEY(`id`)
+ PARTITION BY RANGE(`id`)
+ (
+ PARTITION p1 VALUES LESS THAN ("10"),
+ PARTITION p2 VALUES LESS THAN ("20"),
+ PARTITION p3 VALUES LESS THAN ("30"),
+ PARTITION p4 VALUES LESS THAN ("40"),
+ PARTITION p5 VALUES LESS THAN ("50"),
+ PARTITION p6 VALUES LESS THAN ("60"),
+ PARTITION p7 VALUES LESS THAN ("120")
+ )
+ DISTRIBUTED BY HASH(`id`) BUCKETS 2
+ PROPERTIES
+ (
+ "replication_num" = "1"
+ )
+ """
+ }
+
+ int numRows = 10;
+ List<String> values = []
+ for (int j = 1; j <= numRows; ++j) {
+ values.add("(${j}0, ${j}0)")
+ }
+
+ sql "INSERT INTO ${dbName}.${tableNamePrefix}_0 VALUES ${values.join(",")}"
+ sql "INSERT INTO ${dbName}.${tableNamePrefix}_1 VALUES ${values.join(",")}"
+ sql "INSERT INTO ${dbName}.${tableNamePrefix}_2 VALUES ${values.join(",")}"
+
+ // only backup table 0,1
+ sql """
+ BACKUP SNAPSHOT ${dbName}.${snapshotName}
+ TO `${repoName}`
+ ON (
+ ${tableNamePrefix}_0,
+ ${tableNamePrefix}_1
+ )
+ """
+
+ syncer.waitSnapshotFinish(dbName)
+
+ def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)
+ assertTrue(snapshot != null)
+
+ // drop table_0
+ sql "DROP TABLE ${dbName}.${tableNamePrefix}_0 FORCE"
+
+ // disable restore
+
GetDebugPoint().enableDebugPointForAllFEs("FE.PAUSE_NON_PENDING_RESTORE_JOB",
[value:snapshotName])
+
+ sql """
+ RESTORE SNAPSHOT ${dbName}.${snapshotName}
+ FROM `${repoName}`
+ PROPERTIES
+ (
+ "backup_timestamp" = "${snapshot}",
+ "reserve_replica" = "true",
+ "atomic_restore" = "true"
+ )
+ """
+
+ boolean restore_paused = false
+ for (int k = 0; k < 60; k++) {
+ def records = sql_return_maparray """ SHOW RESTORE FROM ${dbName}
WHERE Label = "${snapshotName}" """
+ if (records.size() == 1 && records[0].State != 'PENDING') {
+ restore_paused = true
+ break
+ }
+ logger.info("SHOW RESTORE result: ${records}")
+ sleep(3000)
+ }
+ assertTrue(restore_paused)
+
+ // 0. table_1 has in_atomic_restore property
+ def show_result = sql """ SHOW CREATE TABLE ${dbName}.${tableNamePrefix}_1
"""
+ logger.info("SHOW CREATE TABLE ${tableNamePrefix}_1: ${show_result}")
+ assertTrue(show_result[0][1].contains("in_atomic_restore"))
+
+ // 1. create a restoring table (not exists before)
+ expectExceptionLike({ ->
+ sql """
+ CREATE TABLE ${dbName}.${tableNamePrefix}_0
+ (
+ `id` LARGEINT NOT NULL,
+ `count` LARGEINT SUM DEFAULT "0"
+ )
+ AGGREGATE KEY(`id`)
+ PARTITION BY RANGE(`id`)
+ (
+ PARTITION p1 VALUES LESS THAN ("10"),
+ PARTITION p2 VALUES LESS THAN ("20"),
+ PARTITION p3 VALUES LESS THAN ("30"),
+ PARTITION p4 VALUES LESS THAN ("40"),
+ PARTITION p5 VALUES LESS THAN ("50"),
+ PARTITION p6 VALUES LESS THAN ("60"),
+ PARTITION p7 VALUES LESS THAN ("120")
+ )
+ DISTRIBUTED BY HASH(`id`) BUCKETS 2
+ PROPERTIES
+ (
+ "replication_num" = "1"
+ )
+ """
+ }, "is in atomic restore, please cancel the restore operation firstly")
+
+ // 2. alter is not allowed
+ expectExceptionLike({
+ sql """
+ ALTER TABLE ${dbName}.${tableNamePrefix}_1
+ ADD PARTITION p8 VALUES LESS THAN("200")
+ """
+ }, "Do not allow doing ALTER ops")
+ expectExceptionLike({
+ sql """
+ ALTER TABLE ${dbName}.${tableNamePrefix}_1
+ DROP PARTITION p1
+ """
+ }, "Do not allow doing ALTER ops")
+ expectExceptionLike({
+ sql """
+ ALTER TABLE ${dbName}.${tableNamePrefix}_1
+ MODIFY PARTITION p1 SET ("key"="value")
+ """
+ }, "Do not allow doing ALTER ops")
+ expectExceptionLike({
+ sql """
+ ALTER TABLE ${dbName}.${tableNamePrefix}_1
+ ADD COLUMN new_col INT DEFAULT "0" AFTER count
+ """
+ }, "Do not allow doing ALTER ops")
+ expectExceptionLike({
+ sql """
+ ALTER TABLE ${dbName}.${tableNamePrefix}_1
+ DROP COLUMN count
+ """
+ }, "Do not allow doing ALTER ops")
+ expectExceptionLike({
+ sql """
+ ALTER TABLE ${dbName}.${tableNamePrefix}_1
+ SET ("is_being_synced"="false")
+ """
+ }, "Do not allow doing ALTER ops")
+ expectExceptionLike({
+ sql """
+ ALTER TABLE ${dbName}.${tableNamePrefix}_1
+ RENAME newTableName
+ """
+ }, "Do not allow doing ALTER ops")
+ // BTW, the tmp table also don't allow rename
+ expectExceptionLike({
+ sql """
+ ALTER TABLE
${dbName}.__doris_atomic_restore_prefix__${tableNamePrefix}_1
+ RENAME newTableName
+ """
+ }, "Do not allow doing ALTER ops")
+ // 3. drop table is not allowed
+ expectExceptionLike({
+ sql """
+ DROP TABLE ${dbName}.${tableNamePrefix}_1
+ """
+ }, "state is in atomic restore")
+ expectExceptionLike({
+ sql """
+ DROP TABLE
${dbName}.__doris_atomic_restore_prefix__${tableNamePrefix}_1
+ """
+ }, "state is RESTORE")
+ // 4. the table name is occupied
+ expectExceptionLike({
+ sql """
+ ALTER TABLE ${dbName}.${tableNamePrefix}_2
+ RENAME ${tableNamePrefix}_0
+ """
+ }, "is already used (in restoring)")
+
+
+ sql "CANCEL RESTORE FROM ${dbName}"
+
+ // 5. The restore job is cancelled, the in_atomic_restore property has
been removed.
+ show_result = sql """ SHOW CREATE TABLE ${dbName}.${tableNamePrefix}_1 """
+ logger.info("SHOW CREATE TABLE ${tableNamePrefix}_1: ${show_result}")
+ assertFalse(show_result[0][1].contains("in_atomic_restore"))
+
+ for (def tableName in tables) {
+ sql "DROP TABLE IF EXISTS ${dbName}.${tableName} FORCE"
+ }
+ sql "DROP DATABASE ${dbName} FORCE"
+ sql "DROP REPOSITORY `${repoName}`"
+}
+
+
+
diff --git
a/regression-test/suites/backup_restore/test_backup_restore_atomic_with_view.groovy
b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_view.groovy
new file mode 100644
index 00000000000..9d090281364
--- /dev/null
+++
b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_view.groovy
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_backup_restore_atomic_with_view", "backup_restore") {
+ String suiteName = "backup_restore_atomic_with_view"
+ String dbName = "${suiteName}_db"
+ String dbName1 = "${suiteName}_db_1"
+ String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "")
+ String snapshotName = "${suiteName}_snapshot"
+ String tableName = "${suiteName}_table"
+ String viewName = "${suiteName}_view"
+
+ def syncer = getSyncer()
+ syncer.createS3Repository(repoName)
+ sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+ sql "CREATE DATABASE IF NOT EXISTS ${dbName1}"
+
+ int numRows = 10;
+ sql "DROP TABLE IF EXISTS ${dbName}.${tableName} FORCE"
+ sql "DROP VIEW IF EXISTS ${dbName}.${viewName}"
+ sql """
+ CREATE TABLE ${dbName}.${tableName} (
+ `id` LARGEINT NOT NULL,
+ `count` LARGEINT SUM DEFAULT "0"
+ )
+ AGGREGATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 2
+ PROPERTIES
+ (
+ "replication_num" = "1"
+ )
+ """
+ List<String> values = []
+ for (int j = 1; j <= numRows; ++j) {
+ values.add("(${j}, ${j})")
+ }
+ sql "INSERT INTO ${dbName}.${tableName} VALUES ${values.join(",")}"
+
+ sql """CREATE VIEW ${dbName}.${viewName} (id, count)
+ AS
+ SELECT * FROM ${dbName}.${tableName} WHERE count > 5
+ """
+
+ qt_sql "SELECT * FROM ${dbName}.${tableName} ORDER BY id ASC"
+ qt_sql "SELECT * FROM ${dbName}.${viewName} ORDER BY id ASC"
+
+ sql """
+ BACKUP SNAPSHOT ${dbName}.${snapshotName}
+ TO `${repoName}`
+ """
+
+ syncer.waitSnapshotFinish(dbName)
+
+ def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)
+ assertTrue(snapshot != null)
+
+ // restore new view
+ sql "DROP TABLE IF EXISTS ${dbName1}.${tableName} FORCE"
+ sql "DROP VIEW IF EXISTS ${dbName1}.${viewName}"
+
+ sql """
+ RESTORE SNAPSHOT ${dbName1}.${snapshotName}
+ FROM `${repoName}`
+ PROPERTIES
+ (
+ "backup_timestamp" = "${snapshot}",
+ "atomic_restore" = "true",
+ "reserve_replica" = "true"
+ )
+ """
+
+ syncer.waitAllRestoreFinish(dbName1)
+
+ qt_sql "SELECT * FROM ${dbName1}.${tableName} ORDER BY id ASC"
+ qt_sql "SELECT * FROM ${dbName1}.${viewName} ORDER BY id ASC"
+ def show_view_result = sql_return_maparray "SHOW VIEW FROM ${tableName}
FROM ${dbName1}"
+ logger.info("show view result: ${show_view_result}")
+ assertTrue(show_view_result.size() == 1);
+ def show_view = show_view_result[0]['Create View']
+ assertTrue(show_view.contains("${dbName1}"))
+ assertTrue(show_view.contains("${tableName}"))
+
+ // restore an exists view
+ sql """
+ RESTORE SNAPSHOT ${dbName}.${snapshotName}
+ FROM `${repoName}`
+ PROPERTIES
+ (
+ "backup_timestamp" = "${snapshot}",
+ "atomic_restore" = "true",
+ "reserve_replica" = "true"
+ )
+ """
+
+ syncer.waitAllRestoreFinish(dbName)
+ def restore_result = sql_return_maparray """ SHOW RESTORE FROM ${dbName}
WHERE Label ="${snapshotName}" """
+ restore_result.last()
+ logger.info("show restore result: ${restore_result}")
+ assertTrue(restore_result.last().State == "FINISHED")
+
+ // View could read the incremental data.
+ sql "INSERT INTO ${dbName}.${tableName} VALUES (11, 11)"
+
+ qt_sql "SELECT * FROM ${dbName}.${tableName} ORDER BY id ASC"
+ qt_sql "SELECT * FROM ${dbName}.${viewName} ORDER BY id ASC"
+
+ sql "DROP REPOSITORY `${repoName}`"
+}
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]