This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4cec7913e79 [bugfix](backup)(cooldown) cancel backup properly when be
backup failed (#38724)
4cec7913e79 is described below
commit 4cec7913e792be18f2a9f9be5df82ab485e5e1a4
Author: zhangyuan <[email protected]>
AuthorDate: Wed Aug 7 09:52:53 2024 +0800
[bugfix](backup)(cooldown) cancel backup properly when be backup failed
(#38724)
Currently, when a backup job failed, but it still at the state of
SNAPSHOTING.
Cancel the cancel backup properly when be backup failed
---
be/src/common/status.cpp | 7 +
be/src/olap/snapshot_manager.cpp | 3 +
be/src/olap/tablet.cpp | 8 +
.../java/org/apache/doris/backup/BackupJob.java | 69 +++++++
.../backup_restore/test_backup_cancelled.groovy | 199 +++++++++++++++++++++
5 files changed, 286 insertions(+)
diff --git a/be/src/common/status.cpp b/be/src/common/status.cpp
index d17e18951c5..cc6c10c2941 100644
--- a/be/src/common/status.cpp
+++ b/be/src/common/status.cpp
@@ -34,6 +34,13 @@ void Status::to_thrift(TStatus* s) const {
// << "The error code has to > 0 because TStatusCode need it > 0,
it's actual value is "
// << _code;
s->status_code = (int16_t)_code > 0 ? (TStatusCode::type)_code :
TStatusCode::INTERNAL_ERROR;
+
+ if (_code == ErrorCode::VERSION_ALREADY_MERGED) {
+ s->status_code = TStatusCode::OLAP_ERR_VERSION_ALREADY_MERGED;
+ } else if (_code == ErrorCode::TABLE_NOT_FOUND) {
+ s->status_code = TStatusCode::TABLET_MISSING;
+ }
+
s->error_msgs.push_back(fmt::format("({})[{}]{}",
BackendOptions::get_localhost(),
code_as_string(), _err_msg ?
_err_msg->_msg : ""));
s->__isset.error_msgs = true;
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index ac4f5ee9728..1aa0229ee65 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -83,6 +83,9 @@ Status SnapshotManager::make_snapshot(const TSnapshotRequest&
request, string* s
}
TabletSharedPtr ref_tablet =
_engine.tablet_manager()->get_tablet(request.tablet_id);
+
+ DBUG_EXECUTE_IF("SnapshotManager::make_snapshot.inject_failure", {
ref_tablet = nullptr; })
+
if (ref_tablet == nullptr) {
return Status::Error<TABLE_NOT_FOUND>("failed to get tablet.
tablet={}", request.tablet_id);
}
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 1a1d3be6bc9..e3032be259f 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -861,6 +861,14 @@ Status Tablet::capture_consistent_versions_unlocked(const
Version& spec_version,
}
}
}
+
+ DBUG_EXECUTE_IF("TTablet::capture_consistent_versions.inject_failure", {
+ auto tablet_id = dp->param<int64>("tablet_id", -1);
+ if (tablet_id != -1 && tablet_id == _tablet_meta->tablet_id()) {
+ status = Status::Error<VERSION_ALREADY_MERGED>("version already
merged");
+ }
+ });
+
return status;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index c706fb70753..d94513efa9d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -169,6 +169,61 @@ public class BackupJob extends AbstractJob {
return BackupContent.ALL;
}
+ private synchronized boolean tryNewTabletSnapshotTask(SnapshotTask task) {
+ Table table =
env.getInternalCatalog().getTableByTableId(task.getTableId());
+ if (table == null) {
+ return false;
+ }
+ OlapTable tbl = (OlapTable) table;
+ tbl.readLock();
+ try {
+ if (tbl.getId() != task.getTableId()) {
+ return false;
+ }
+ Partition partition = tbl.getPartition(task.getPartitionId());
+ if (partition == null) {
+ return false;
+ }
+ MaterializedIndex index = partition.getIndex(task.getIndexId());
+ if (index == null) {
+ return false;
+ }
+ Tablet tablet = index.getTablet(task.getTabletId());
+ if (tablet == null) {
+ return false;
+ }
+ Replica replica = chooseReplica(tablet, task.getVersion());
+ if (replica == null) {
+ return false;
+ }
+
+ //clear old task
+ AgentTaskQueue.removeTaskOfType(TTaskType.MAKE_SNAPSHOT,
task.getTabletId());
+ unfinishedTaskIds.remove(task.getTabletId());
+ taskProgress.remove(task.getTabletId());
+ taskErrMsg.remove(task.getTabletId());
+
+ SnapshotTask newTask = new SnapshotTask(null,
replica.getBackendId(), task.getTabletId(),
+ task.getJobId(), task.getDbId(), tbl.getId(),
task.getPartitionId(),
+ task.getIndexId(), task.getTabletId(),
+ task.getVersion(),
+ task.getSchemaHash(), timeoutMs, false /* not restore task
*/);
+ AgentBatchTask batchTask = new AgentBatchTask();
+ batchTask.addTask(newTask);
+ unfinishedTaskIds.put(tablet.getId(), replica.getBackendId());
+
+ //send task
+ AgentTaskQueue.addTask(newTask);
+ AgentTaskExecutor.submit(batchTask);
+
+ } finally {
+ tbl.readUnlock();
+ }
+
+ return true;
+ }
+
+
public synchronized boolean finishTabletSnapshotTask(SnapshotTask task,
TFinishTaskRequest request) {
Preconditions.checkState(task.getJobId() == jobId);
@@ -181,6 +236,20 @@ public class BackupJob extends AbstractJob {
"make snapshot failed, version already merged");
cancelInternal();
}
+
+ if (request.getTaskStatus().getStatusCode() ==
TStatusCode.TABLET_MISSING
+ && !tryNewTabletSnapshotTask(task)) {
+ status = new Status(ErrCode.NOT_FOUND,
+ "make snapshot failed, failed to ge tablet, table will
be droped or truncated");
+ cancelInternal();
+ }
+
+ if (request.getTaskStatus().getStatusCode() ==
TStatusCode.NOT_IMPLEMENTED_ERROR) {
+ status = new Status(ErrCode.COMMON_ERROR,
+ "make snapshot failed, currently not support backup tablet
with cooldowned remote data");
+ cancelInternal();
+ }
+
return false;
}
diff --git a/regression-test/suites/backup_restore/test_backup_cancelled.groovy
b/regression-test/suites/backup_restore/test_backup_cancelled.groovy
new file mode 100644
index 00000000000..8a472ca9e26
--- /dev/null
+++ b/regression-test/suites/backup_restore/test_backup_cancelled.groovy
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_backup_cancelled", "backup_cancelled") {
+ String suiteName = "test_backup_cancelled"
+ String repoName = "${suiteName}_repo"
+ String dbName = "${suiteName}_db"
+ String tableName = "${suiteName}_table"
+ String snapshotName = "${suiteName}_snapshot"
+ String snapshotName_1 = "${suiteName}_snapshot1"
+
+ def syncer = getSyncer()
+ syncer.createS3Repository(repoName)
+
+ sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+ sql "DROP TABLE IF EXISTS ${dbName}.${tableName}"
+ sql """
+ CREATE TABLE ${dbName}.${tableName} (
+ `id` LARGEINT NOT NULL,
+ `count` LARGEINT SUM DEFAULT "0")
+ AGGREGATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 2
+ PROPERTIES
+ (
+ "replication_num" = "1"
+ )
+ """
+
+ List<String> values = []
+ for (int i = 1; i <= 10; ++i) {
+ values.add("(${i}, ${i})")
+ }
+ sql "INSERT INTO ${dbName}.${tableName} VALUES ${values.join(",")}"
+ def result = sql "SELECT * FROM ${dbName}.${tableName}"
+ assertEquals(result.size(), values.size());
+
+ result = sql_return_maparray """show tablets from ${dbName}.${tableName}"""
+ assertNotNull(result)
+ def tabletId = null
+ for (def res : result) {
+ tabletId = res.TabletId
+ break
+ }
+
+ // test failed to get tablet when truncate or drop table
+
+
GetDebugPoint().enableDebugPointForAllBEs("SnapshotManager::make_snapshot.inject_failure",
[tablet_id:"${tabletId}", execute:3]);
+
+
+ sql """
+ BACKUP SNAPSHOT ${dbName}.${snapshotName}
+ TO `${repoName}`
+ ON (${tableName})
+ """
+
+ syncer.waitSnapshotFinish(dbName)
+
+
+
GetDebugPoint().disableDebugPointForAllBEs("SnapshotManager::make_snapshot.inject_failure")
+
+
+
+
+ // test missing versions when compaction or balance
+
+
GetDebugPoint().enableDebugPointForAllBEs("Tablet::capture_consistent_versions.inject_failure",
[tablet_id:"${tabletId}", execute:1]);
+
+ sql """
+ BACKUP SNAPSHOT ${dbName}.${snapshotName_1}
+ TO `${repoName}`
+ ON (${tableName})
+ """
+
+ syncer.waitSnapshotFinish(dbName)
+
+
GetDebugPoint().disableDebugPointForAllBEs("Tablet::capture_consistent_versions.inject_failure");
+
+
+ def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)
+ assertTrue(snapshot != null)
+
+ sql "TRUNCATE TABLE ${dbName}.${tableName}"
+
+ sql """
+ RESTORE SNAPSHOT ${dbName}.${snapshotName}
+ FROM `${repoName}`
+ ON ( `${tableName}`)
+ PROPERTIES
+ (
+ "backup_timestamp" = "${snapshot}",
+ "reserve_replica" = "true"
+ )
+ """
+
+ syncer.waitAllRestoreFinish(dbName)
+
+ result = sql "SELECT * FROM ${dbName}.${tableName}"
+ assertEquals(result.size(), values.size());
+
+ sql "DROP TABLE ${dbName}.${tableName} FORCE"
+ sql "DROP DATABASE ${dbName} FORCE"
+ sql "DROP REPOSITORY `${repoName}`"
+}
+
+
+suite("test_backup_cooldown_cancelled", "backup_cooldown_cancelled") {
+
+ String suiteName = "test_backup_cooldown_cancelled"
+ String resource_name = "resource_${suiteName}"
+ String policy_name= "policy_${suiteName}"
+ String dbName = "${suiteName}_db"
+ String tableName = "${suiteName}_table"
+ String snapshotName = "${suiteName}_snapshot"
+ String repoName = "${suiteName}_repo"
+
+ def syncer = getSyncer()
+ syncer.createS3Repository(repoName)
+
+
+
+ sql """
+ CREATE RESOURCE IF NOT EXISTS "${resource_name}"
+ PROPERTIES(
+ "type"="s3",
+ "AWS_ENDPOINT" = "${getS3Endpoint()}",
+ "AWS_REGION" = "${getS3Region()}",
+ "AWS_ROOT_PATH" = "regression/cooldown",
+ "AWS_ACCESS_KEY" = "${getS3AK()}",
+ "AWS_SECRET_KEY" = "${getS3SK()}",
+ "AWS_MAX_CONNECTIONS" = "50",
+ "AWS_REQUEST_TIMEOUT_MS" = "3000",
+ "AWS_CONNECTION_TIMEOUT_MS" = "1000",
+ "AWS_BUCKET" = "${getS3BucketName()}",
+ "s3_validity_check" = "true"
+ );
+ """
+
+ sql """
+ CREATE STORAGE POLICY IF NOT EXISTS ${policy_name}
+ PROPERTIES(
+ "storage_resource" = "${resource_name}",
+ "cooldown_ttl" = "300"
+ )
+ """
+
+ sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+ sql "DROP TABLE IF EXISTS ${dbName}.${tableName}"
+
+ sql """
+ CREATE TABLE ${dbName}.${tableName}
+ (
+ k1 BIGINT,
+ v1 VARCHAR(48)
+ )
+ DUPLICATE KEY(k1)
+ DISTRIBUTED BY HASH (k1) BUCKETS 3
+ PROPERTIES(
+ "storage_policy" = "${policy_name}",
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+
+ // test backup cooldown table and should be cancelled
+ sql """
+ BACKUP SNAPSHOT ${dbName}.${snapshotName}
+ TO `${repoName}`
+ ON (${tableName})
+ """
+
+ syncer.waitSnapshotFinish(dbName)
+
+ //cleanup
+ sql "DROP TABLE ${dbName}.${tableName} FORCE"
+ sql "DROP DATABASE ${dbName} FORCE"
+ sql "DROP REPOSITORY `${repoName}`"
+
+ sql """
+ drop storage policy ${policy_name};
+ """
+
+ sql """
+ drop resource ${resource_name};
+ """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]