This is an automated email from the ASF dual-hosted git repository. w41ter pushed a commit to branch disable_alter_ops_during_atomic_restore in repository https://gitbox.apache.org/repos/asf/doris.git
commit bc406246e24a3ba8aec0ede744b8c4823cc60473 Author: w41ter <[email protected]> AuthorDate: Fri Sep 13 08:43:22 2024 +0000 [improve](restore) Disable alter ops during atomic restore The atomic restore is introduced in #40353. It replaces tables instead of updating in place, but the origin table might be altered during the restoring, which might cause the restore job to fail. This PR adds a property in_atomic_restore for those tables and forbids any update to those tables involved in restoring. --- .../java/org/apache/doris/backup/RestoreJob.java | 26 ++- .../main/java/org/apache/doris/catalog/Env.java | 8 + .../java/org/apache/doris/catalog/OlapTable.java | 19 ++ .../org/apache/doris/catalog/TableProperty.java | 22 ++ .../apache/doris/common/util/PropertyAnalyzer.java | 1 + .../apache/doris/datasource/InternalCatalog.java | 14 +- .../test_backup_restore_atomic_with_alter.groovy | 241 +++++++++++++++++++++ 7 files changed, 324 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 77a5f1c896f..13a6d3a8051 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -63,6 +63,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.DbUtil; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.TimeUtils; @@ -438,6 +439,12 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { checkIfNeedCancel(); if (status.ok()) { + if (state != RestoreJobState.PENDING && label.equals( + DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_NON_PENDING_RESTORE_JOB", ""))) { + LOG.info("pause restore job by debug point: {}", this); + return; + } + switch (state) { case PENDING: checkAndPrepareMeta(); @@ -578,6 +585,8 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { if (isAtomicRestore) { // We will create new OlapTable in atomic restore, so does not set the RESTORE state. + // Instead, set table in atomic restore state, to forbid the alter table operation. + olapTbl.setInAtomicRestore(); continue; } @@ -1428,11 +1437,6 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { // replay set all existing tables's state to RESTORE for (String tableName : jobInfo.backupOlapTableObjects.keySet()) { - if (isAtomicRestore) { - // Atomic restore will creates new replica of the OlapTable. - continue; - } - Table tbl = db.getTableNullable(jobInfo.getAliasByOriginNameIfSet(tableName)); if (tbl == null) { continue; @@ -1440,6 +1444,12 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { OlapTable olapTbl = (OlapTable) tbl; tbl.writeLock(); try { + if (isAtomicRestore) { + // Atomic restore will creates new replica of the OlapTable. + olapTbl.setInAtomicRestore(); + continue; + } + olapTbl.setState(OlapTableState.RESTORE); // set restore status for partitions BackupOlapTableInfo tblInfo = jobInfo.backupOlapTableObjects.get(tableName); @@ -2394,6 +2404,10 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { LOG.info("table {} set state from {} to normal", tableName, olapTbl.getState()); olapTbl.setState(OlapTableState.NORMAL); } + if (olapTbl.isInAtomicRestore()) { + olapTbl.clearInAtomicRestore(); + LOG.info("table {} set state from atomic restore to normal", tableName); + } BackupOlapTableInfo tblInfo = jobInfo.backupOlapTableObjects.get(tableName); for (Map.Entry<String, BackupPartitionInfo> partitionEntry : tblInfo.partitions.entrySet()) { @@ -2537,7 +2551,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { return sb.toString(); } - private String tableAliasWithAtomicRestore(String tableName) { + public static String tableAliasWithAtomicRestore(String tableName) { return ATOMIC_RESTORE_TABLE_PREFIX + tableName; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 1293b5465c5..45c13a087b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -79,6 +79,7 @@ import org.apache.doris.analysis.TableRenameClause; import org.apache.doris.analysis.TruncateTableStmt; import org.apache.doris.analysis.UninstallPluginStmt; import org.apache.doris.backup.BackupHandler; +import org.apache.doris.backup.RestoreJob; import org.apache.doris.binlog.BinlogGcer; import org.apache.doris.binlog.BinlogManager; import org.apache.doris.blockrule.SqlBlockRuleMgr; @@ -3681,6 +3682,10 @@ public class Env { .append("\" = \""); sb.append(olapTable.isDuplicateWithoutKey()).append("\""); } + + if (olapTable.isInAtomicRestore()) { + sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_IN_ATOMIC_RESTORE).append("\" = \"true\""); + } } /** @@ -4707,6 +4712,9 @@ public class Env { if (db.getTable(newTableName).isPresent()) { throw new DdlException("Table name[" + newTableName + "] is already used"); } + if (db.getTable(RestoreJob.tableAliasWithAtomicRestore(newTableName)).isPresent()) { + throw new DdlException("Table name[" + newTableName + "] is already used (in restoring)"); + } if (table.isManagedTable()) { // olap table should also check if any rollup has same name as "newTableName" diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 2f5eb35ad75..20737d9a035 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1988,6 +1988,10 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc throw new DdlException("Table[" + name + "]'s state(" + state.toString() + ") is not NORMAL. Do not allow doing ALTER ops"); } + if (tableProperty != null && tableProperty.isInAtomicRestore()) { + throw new DdlException("Table[" + name + "] is in atomic restore state. " + + "Do not allow doing ALTER ops"); + } } public boolean isStable(SystemInfoService infoService, TabletScheduler tabletScheduler) { @@ -2250,6 +2254,21 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc return nameToPartition.containsKey(partitionName); } + public void setInAtomicRestore() { + getOrCreatTableProperty().setInAtomicRestore().buildInAtomicRestore(); + } + + public void clearInAtomicRestore() { + getOrCreatTableProperty().clearInAtomicRestore().buildInAtomicRestore(); + } + + public boolean isInAtomicRestore() { + if (tableProperty != null) { + return tableProperty.isInAtomicRestore(); + } + return false; + } + public long getTTLSeconds() { if (tableProperty != null) { return tableProperty.getTTLSeconds(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java index fdb60357f40..1c1d7e35880 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -68,6 +68,7 @@ public class TableProperty implements Writable, GsonPostProcessable { private boolean isInMemory = false; private short minLoadReplicaNum = -1; private long ttlSeconds = 0L; + private boolean isInAtomicRestore = false; private String storagePolicy = ""; private Boolean isBeingSynced = null; @@ -218,6 +219,26 @@ public class TableProperty implements Writable, GsonPostProcessable { return this; } + public TableProperty buildInAtomicRestore() { + isInAtomicRestore = Boolean.parseBoolean(properties.getOrDefault( + PropertyAnalyzer.PROPERTIES_IN_ATOMIC_RESTORE, "false")); + return this; + } + + public boolean isInAtomicRestore() { + return isInAtomicRestore; + } + + public TableProperty setInAtomicRestore() { + properties.put(PropertyAnalyzer.PROPERTIES_IN_ATOMIC_RESTORE, "true"); + return this; + } + + public TableProperty clearInAtomicRestore() { + properties.remove(PropertyAnalyzer.PROPERTIES_IN_ATOMIC_RESTORE); + return this; + } + public TableProperty buildTTLSeconds() { ttlSeconds = Long.parseLong(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS, "0")); return this; @@ -705,6 +726,7 @@ public class TableProperty implements Writable, GsonPostProcessable { buildTimeSeriesCompactionLevelThreshold(); buildTTLSeconds(); buildVariantEnableFlattenNested(); + buildInAtomicRestore(); if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) { // get replica num from property map and create replica allocation diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 62c643df5c6..ce19cf37ce2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -83,6 +83,7 @@ public class PropertyAnalyzer { public static final String PROPERTIES_SCHEMA_VERSION = "schema_version"; public static final String PROPERTIES_PARTITION_ID = "partition_id"; public static final String PROPERTIES_VISIBLE_VERSION = "visible_version"; + public static final String PROPERTIES_IN_ATOMIC_RESTORE = "in_atomic_restore"; public static final String PROPERTIES_BF_COLUMNS = "bloom_filter_columns"; public static final String PROPERTIES_BF_FPP = "bloom_filter_fpp"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index bf19bdc37c1..77fe701f204 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -59,6 +59,7 @@ import org.apache.doris.analysis.TableName; import org.apache.doris.analysis.TableRef; import org.apache.doris.analysis.TruncateTableStmt; import org.apache.doris.analysis.TypeDef; +import org.apache.doris.backup.RestoreJob; import org.apache.doris.catalog.BinlogConfig; import org.apache.doris.catalog.BrokerTable; import org.apache.doris.catalog.ColocateGroupSchema; @@ -928,10 +929,16 @@ public class InternalCatalog implements CatalogIf<Database> { OlapTable olapTable = (OlapTable) table; if ((olapTable.getState() != OlapTableState.NORMAL)) { throw new DdlException("The table [" + tableName + "]'s state is " + olapTable.getState() - + ", cannot be dropped." + " please cancel the operation on olap table firstly." + + ", cannot be dropped. please cancel the operation on olap table firstly." + " If you want to forcibly drop(cannot be recovered)," + " please use \"DROP table FORCE\"."); } + if (olapTable.isInAtomicRestore()) { + throw new DdlException("The table [" + tableName + "]'s state is in atomic restore" + + ", cannot be dropped. please cancel the restore operation on olap table" + + " firstly. If you want to forcibly drop(cannot be recovered)," + + " please use \"DROP table FORCE\"."); + } } dropTableInternal(db, table, stmt.isForceDrop(), watch, costTimes); @@ -1226,6 +1233,11 @@ public class InternalCatalog implements CatalogIf<Database> { ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); } } + if (db.getTable(RestoreJob.tableAliasWithAtomicRestore(tableName)).isPresent()) { + ErrorReport.reportDdlException( + "table[{}] is in atomic restore, please cancel the restore operation firstly", + ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); + } if (engineName.equals("olap")) { return createOlapTable(db, stmt); diff --git a/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy new file mode 100644 index 00000000000..46a3ca5b29d --- /dev/null +++ b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_backup_restore_atomic_with_alter", "backup_restore") { + if (!getFeConfig("enable_debug_points").equals("true")) { + logger.info("Config.enable_debug_points=true is required") + return + } + + String suiteName = "test_backup_restore_atomic_with_alter" + String dbName = "${suiteName}_db" + String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "") + String snapshotName = "snapshot_" + UUID.randomUUID().toString().replace("-", "") + String tableNamePrefix = "${suiteName}_tables" + + def syncer = getSyncer() + syncer.createS3Repository(repoName) + sql "DROP DATABASE IF EXISTS ${dbName} FORCE" + sql "CREATE DATABASE ${dbName}" + + // during restoring, if: + // 1. table_0 not exists, create table_0 is not allowed + // 2. table_1 exists, alter operation is not allowed + // 3. table_1 exists, drop table is not allowed + // 4. table_0 not exists, rename table_2 to table_0 is not allowed + int numTables = 3; + List<String> tables = [] + for (int i = 0; i < numTables; ++i) { + String tableName = "${tableNamePrefix}_${i}" + tables.add(tableName) + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + sql """ + CREATE TABLE ${dbName}.${tableName} ( + `id` LARGEINT NOT NULL, + `count` LARGEINT SUM DEFAULT "0" + ) + AGGREGATE KEY(`id`) + PARTITION BY RANGE(`id`) + ( + PARTITION p1 VALUES LESS THAN ("10"), + PARTITION p2 VALUES LESS THAN ("20"), + PARTITION p3 VALUES LESS THAN ("30"), + PARTITION p4 VALUES LESS THAN ("40"), + PARTITION p5 VALUES LESS THAN ("50"), + PARTITION p6 VALUES LESS THAN ("60"), + PARTITION p7 VALUES LESS THAN ("120") + ) + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES + ( + "replication_num" = "1" + ) + """ + } + + int numRows = 10; + List<String> values = [] + for (int j = 1; j <= numRows; ++j) { + values.add("(${j}0, ${j}0)") + } + + sql "INSERT INTO ${dbName}.${tableNamePrefix}_0 VALUES ${values.join(",")}" + sql "INSERT INTO ${dbName}.${tableNamePrefix}_1 VALUES ${values.join(",")}" + sql "INSERT INTO ${dbName}.${tableNamePrefix}_2 VALUES ${values.join(",")}" + + // only backup table 0,1 + sql """ + BACKUP SNAPSHOT ${dbName}.${snapshotName} + TO `${repoName}` + ON ( + ${tableNamePrefix}_0, + ${tableNamePrefix}_1 + ) + """ + + syncer.waitSnapshotFinish(dbName) + + def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName) + assertTrue(snapshot != null) + + // drop table_0 + sql "DROP TABLE ${dbName}.${tableNamePrefix}_0 FORCE" + + // disable restore + GetDebugPoint().enableDebugPointForAllFEs("FE.PAUSE_NON_PENDING_RESTORE_JOB", [value:snapshotName]) + + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "atomic_restore" = "true" + ) + """ + + boolean restore_paused = false + for (int k = 0; k < 60; k++) { + def records = sql_return_maparray """ SHOW RESTORE FROM ${dbName} WHERE Label = "${snapshotName}" """ + if (records.size() == 1 && records[0].State != 'PENDING') { + restore_paused = true + break + } + logger.info("SHOW RESTORE result: ${records}") + sleep(3000) + } + assertTrue(restore_paused) + + // 0. table_1 has in_atomic_restore property + def show_result = sql """ SHOW CREATE TABLE ${dbName}.${tableNamePrefix}_1 """ + logger.info("SHOW CREATE TABLE ${tableNamePrefix}_1: ${show_result}") + assertTrue(show_result[0][1].contains("in_atomic_restore")) + + // 1. create a restoring table (not exists before) + expectExceptionLike({ -> + sql """ + CREATE TABLE ${dbName}.${tableNamePrefix}_0 + ( + `id` LARGEINT NOT NULL, + `count` LARGEINT SUM DEFAULT "0" + ) + AGGREGATE KEY(`id`) + PARTITION BY RANGE(`id`) + ( + PARTITION p1 VALUES LESS THAN ("10"), + PARTITION p2 VALUES LESS THAN ("20"), + PARTITION p3 VALUES LESS THAN ("30"), + PARTITION p4 VALUES LESS THAN ("40"), + PARTITION p5 VALUES LESS THAN ("50"), + PARTITION p6 VALUES LESS THAN ("60"), + PARTITION p7 VALUES LESS THAN ("120") + ) + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES + ( + "replication_num" = "1" + ) + """ + }, "is in atomic restore, please cancel the restore operation firstly") + + // 2. alter is not allowed + expectExceptionLike({ + sql """ + ALTER TABLE ${dbName}.${tableNamePrefix}_1 + ADD PARTITION p8 VALUES LESS THAN("200") + """ + }, "Do not allow doing ALTER ops") + expectExceptionLike({ + sql """ + ALTER TABLE ${dbName}.${tableNamePrefix}_1 + DROP PARTITION p1 + """ + }, "Do not allow doing ALTER ops") + expectExceptionLike({ + sql """ + ALTER TABLE ${dbName}.${tableNamePrefix}_1 + MODIFY PARTITION p1 SET ("key"="value") + """ + }, "Do not allow doing ALTER ops") + expectExceptionLike({ + sql """ + ALTER TABLE ${dbName}.${tableNamePrefix}_1 + ADD COLUMN new_col INT DEFAULT "0" AFTER count + """ + }, "Do not allow doing ALTER ops") + expectExceptionLike({ + sql """ + ALTER TABLE ${dbName}.${tableNamePrefix}_1 + DROP COLUMN count + """ + }, "Do not allow doing ALTER ops") + expectExceptionLike({ + sql """ + ALTER TABLE ${dbName}.${tableNamePrefix}_1 + SET ("is_being_synced"="false") + """ + }, "Do not allow doing ALTER ops") + expectExceptionLike({ + sql """ + ALTER TABLE ${dbName}.${tableNamePrefix}_1 + RENAME newTableName + """ + }, "Do not allow doing ALTER ops") + // BTW, the tmp table also don't allow rename + expectExceptionLike({ + sql """ + ALTER TABLE ${dbName}.__doris_atomic_restore_prefix__${tableNamePrefix}_1 + RENAME newTableName + """ + }, "Do not allow doing ALTER ops") + // 3. drop table is not allowed + expectExceptionLike({ + sql """ + DROP TABLE ${dbName}.${tableNamePrefix}_1 + """ + }, "state is in atomic restore") + expectExceptionLike({ + sql """ + DROP TABLE ${dbName}.__doris_atomic_restore_prefix__${tableNamePrefix}_1 + """ + }, "state is RESTORE") + // 4. the table name is occupied + expectExceptionLike({ + sql """ + ALTER TABLE ${dbName}.${tableNamePrefix}_2 + RENAME ${tableNamePrefix}_0 + """ + }, "is already used (in restoring)") + + + sql "CANCEL RESTORE FROM ${dbName}" + + // 5. The restore job is cancelled, the in_atomic_restore property has been removed. + show_result = sql """ SHOW CREATE TABLE ${dbName}.${tableNamePrefix}_1 """ + logger.info("SHOW CREATE TABLE ${tableNamePrefix}_1: ${show_result}") + assertFalse(show_result[0][1].contains("in_atomic_restore")) + + for (def tableName in tables) { + sql "DROP TABLE IF EXISTS ${dbName}.${tableName} FORCE" + } + sql "DROP DATABASE ${dbName} FORCE" + sql "DROP REPOSITORY `${repoName}`" +} + + + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
