This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 2168f0932f4 [feature](restore) Support clean_tables/clean_partitions
properties for restore job #39028 (#39366)
2168f0932f4 is described below
commit 2168f0932f415fd454037d79bf36e97fe0f7d689
Author: walter <[email protected]>
AuthorDate: Thu Aug 15 14:16:02 2024 +0800
[feature](restore) Support clean_tables/clean_partitions properties for
restore job #39028 (#39366)
cherry pick from #39028
---
.../org/apache/doris/analysis/RestoreStmt.java | 104 +++++++------
.../org/apache/doris/backup/BackupHandler.java | 4 +-
.../java/org/apache/doris/backup/RestoreJob.java | 97 +++++++++++-
.../java/org/apache/doris/catalog/Partition.java | 4 +-
.../apache/doris/datasource/InternalCatalog.java | 112 +++++++++-----
.../apache/doris/service/FrontendServiceImpl.java | 12 ++
.../org/apache/doris/backup/RestoreJobTest.java | 2 +-
gensrc/thrift/FrontendService.thrift | 2 +
.../test_backup_restore_clean_restore.groovy | 162 +++++++++++++++++++++
9 files changed, 393 insertions(+), 106 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
index 394b50d8a02..fe66f0ee4cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
@@ -35,13 +35,15 @@ import java.util.Set;
public class RestoreStmt extends AbstractBackupStmt {
private static final String PROP_ALLOW_LOAD = "allow_load";
- private static final String PROP_REPLICATION_NUM = "replication_num";
private static final String PROP_BACKUP_TIMESTAMP = "backup_timestamp";
private static final String PROP_META_VERSION = "meta_version";
- private static final String PROP_RESERVE_REPLICA = "reserve_replica";
- private static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE =
"reserve_dynamic_partition_enable";
private static final String PROP_IS_BEING_SYNCED =
PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED;
+ public static final String PROP_RESERVE_REPLICA = "reserve_replica";
+ public static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE =
"reserve_dynamic_partition_enable";
+ public static final String PROP_CLEAN_TABLES = "clean_tables";
+ public static final String PROP_CLEAN_PARTITIONS = "clean_partitions";
+
private boolean allowLoad = false;
private ReplicaAllocation replicaAlloc =
ReplicaAllocation.DEFAULT_ALLOCATION;
private String backupTimestamp = null;
@@ -50,16 +52,18 @@ public class RestoreStmt extends AbstractBackupStmt {
private boolean reserveDynamicPartitionEnable = false;
private boolean isLocal = false;
private boolean isBeingSynced = false;
+ private boolean isCleanTables = false;
+ private boolean isCleanPartitions = false;
private byte[] meta = null;
private byte[] jobInfo = null;
public RestoreStmt(LabelName labelName, String repoName,
AbstractBackupTableRefClause restoreTableRefClause,
- Map<String, String> properties) {
+ Map<String, String> properties) {
super(labelName, repoName, restoreTableRefClause, properties);
}
public RestoreStmt(LabelName labelName, String repoName,
AbstractBackupTableRefClause restoreTableRefClause,
- Map<String, String> properties, byte[] meta, byte[]
jobInfo) {
+ Map<String, String> properties, byte[] meta, byte[] jobInfo) {
super(labelName, repoName, restoreTableRefClause, properties);
this.meta = meta;
this.jobInfo = jobInfo;
@@ -109,6 +113,14 @@ public class RestoreStmt extends AbstractBackupStmt {
return isBeingSynced;
}
+ public boolean isCleanTables() {
+ return isCleanTables;
+ }
+
+ public boolean isCleanPartitions() {
+ return isCleanPartitions;
+ }
+
@Override
public void analyze(Analyzer analyzer) throws UserException {
if (repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) {
@@ -138,17 +150,7 @@ public class RestoreStmt extends AbstractBackupStmt {
Map<String, String> copiedProperties = Maps.newHashMap(properties);
// allow load
- if (copiedProperties.containsKey(PROP_ALLOW_LOAD)) {
- if
(copiedProperties.get(PROP_ALLOW_LOAD).equalsIgnoreCase("true")) {
- allowLoad = true;
- } else if
(copiedProperties.get(PROP_ALLOW_LOAD).equalsIgnoreCase("false")) {
- allowLoad = false;
- } else {
- ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
- "Invalid allow load value: " +
copiedProperties.get(PROP_ALLOW_LOAD));
- }
- copiedProperties.remove(PROP_ALLOW_LOAD);
- }
+ allowLoad = eatBooleanProperty(copiedProperties, PROP_ALLOW_LOAD,
allowLoad);
// replication num
this.replicaAlloc =
PropertyAnalyzer.analyzeReplicaAllocation(copiedProperties, "");
@@ -156,34 +158,16 @@ public class RestoreStmt extends AbstractBackupStmt {
this.replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
}
// reserve replica
- if (copiedProperties.containsKey(PROP_RESERVE_REPLICA)) {
- if
(copiedProperties.get(PROP_RESERVE_REPLICA).equalsIgnoreCase("true")) {
- reserveReplica = true;
- } else if
(copiedProperties.get(PROP_RESERVE_REPLICA).equalsIgnoreCase("false")) {
- reserveReplica = false;
- } else {
- ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
- "Invalid reserve_replica value: " +
copiedProperties.get(PROP_RESERVE_REPLICA));
- }
- // force set reserveReplica to false, do not keep the origin
allocation
- if (reserveReplica &&
!Config.force_olap_table_replication_allocation.isEmpty()) {
- reserveReplica = false;
- }
- copiedProperties.remove(PROP_RESERVE_REPLICA);
+ reserveReplica = eatBooleanProperty(copiedProperties,
PROP_RESERVE_REPLICA, reserveReplica);
+ // force set reserveReplica to false, do not keep the origin allocation
+ if (reserveReplica &&
!Config.force_olap_table_replication_allocation.isEmpty()) {
+ reserveReplica = false;
}
+
// reserve dynamic partition enable
- if
(copiedProperties.containsKey(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE)) {
- if
(copiedProperties.get(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE).equalsIgnoreCase("true"))
{
- reserveDynamicPartitionEnable = true;
- } else if
(copiedProperties.get(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE).equalsIgnoreCase("false"))
{
- reserveDynamicPartitionEnable = false;
- } else {
- ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
- "Invalid reserve dynamic partition enable value: "
- +
copiedProperties.get(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE));
- }
- copiedProperties.remove(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE);
- }
+ reserveDynamicPartitionEnable = eatBooleanProperty(
+ copiedProperties, PROP_RESERVE_DYNAMIC_PARTITION_ENABLE,
reserveDynamicPartitionEnable);
+
// backup timestamp
if (copiedProperties.containsKey(PROP_BACKUP_TIMESTAMP)) {
backupTimestamp = copiedProperties.get(PROP_BACKUP_TIMESTAMP);
@@ -207,17 +191,13 @@ public class RestoreStmt extends AbstractBackupStmt {
}
// is being synced
- if (copiedProperties.containsKey(PROP_IS_BEING_SYNCED)) {
- if
(copiedProperties.get(PROP_IS_BEING_SYNCED).equalsIgnoreCase("true")) {
- isBeingSynced = true;
- } else if
(copiedProperties.get(PROP_IS_BEING_SYNCED).equalsIgnoreCase("false")) {
- isBeingSynced = false;
- } else {
- ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
- "Invalid is being synced value: " +
copiedProperties.get(PROP_IS_BEING_SYNCED));
- }
- copiedProperties.remove(PROP_IS_BEING_SYNCED);
- }
+ isBeingSynced = eatBooleanProperty(copiedProperties,
PROP_IS_BEING_SYNCED, isBeingSynced);
+
+ // is clean tables
+ isCleanTables = eatBooleanProperty(copiedProperties,
PROP_CLEAN_TABLES, isCleanTables);
+
+ // is clean partitions
+ isCleanPartitions = eatBooleanProperty(copiedProperties,
PROP_CLEAN_PARTITIONS, isCleanPartitions);
if (!copiedProperties.isEmpty()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
@@ -243,4 +223,22 @@ public class RestoreStmt extends AbstractBackupStmt {
sb.append("\n)");
return sb.toString();
}
+
+ private boolean eatBooleanProperty(Map<String, String> copiedProperties,
String name, boolean defaultValue)
+ throws AnalysisException {
+ boolean retval = defaultValue;
+ if (copiedProperties.containsKey(name)) {
+ String value = copiedProperties.get(name);
+ if (value.equalsIgnoreCase("true")) {
+ retval = true;
+ } else if (value.equalsIgnoreCase("false")) {
+ retval = false;
+ } else {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
+ "Invalid boolean property " + name + " value: " +
value);
+ }
+ copiedProperties.remove(name);
+ }
+ return retval;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index 77213a35b42..144b4e49360 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -457,12 +457,12 @@ public class BackupHandler extends MasterDaemon
implements Writable {
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(),
stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(),
stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(),
- env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
+ stmt.isCleanTables(), stmt.isCleanPartitions(), env,
Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
} else {
restoreJob = new RestoreJob(stmt.getLabel(),
stmt.getBackupTimestamp(),
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(),
stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), stmt.getMetaVersion(),
stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(),
- stmt.isBeingSynced(), env, repository.getId());
+ stmt.isBeingSynced(), stmt.isCleanTables(),
stmt.isCleanPartitions(), env, repository.getId());
}
env.getEditLog().logRestoreJob(restoreJob);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 1b6b4b47072..c76ce530141 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -18,6 +18,7 @@
package org.apache.doris.backup;
import org.apache.doris.analysis.BackupStmt.BackupContent;
+import org.apache.doris.analysis.RestoreStmt;
import org.apache.doris.backup.BackupJobInfo.BackupIndexInfo;
import org.apache.doris.backup.BackupJobInfo.BackupOlapTableInfo;
import org.apache.doris.backup.BackupJobInfo.BackupPartitionInfo;
@@ -63,6 +64,7 @@ import org.apache.doris.common.util.DbUtil;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.resource.Tag;
import org.apache.doris.task.AgentBatchTask;
@@ -106,9 +108,12 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class RestoreJob extends AbstractJob {
- private static final String PROP_RESERVE_REPLICA = "reserve_replica";
- private static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE =
"reserve_dynamic_partition_enable";
+ private static final String PROP_RESERVE_REPLICA =
RestoreStmt.PROP_RESERVE_REPLICA;
+ private static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE =
+ RestoreStmt.PROP_RESERVE_DYNAMIC_PARTITION_ENABLE;
private static final String PROP_IS_BEING_SYNCED =
PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED;
+ private static final String PROP_CLEAN_TABLES =
RestoreStmt.PROP_CLEAN_TABLES;
+ private static final String PROP_CLEAN_PARTITIONS =
RestoreStmt.PROP_CLEAN_PARTITIONS;
private static final Logger LOG = LogManager.getLogger(RestoreJob.class);
@@ -173,6 +178,11 @@ public class RestoreJob extends AbstractJob {
private boolean isBeingSynced = false;
+ // Whether to delete existing tables that are not involved in the restore.
+ private boolean isCleanTables = false;
+ // Whether to delete existing partitions that are not involved in the
restore.
+ private boolean isCleanPartitions = false;
+
// restore properties
private Map<String, String> properties = Maps.newHashMap();
@@ -182,7 +192,8 @@ public class RestoreJob extends AbstractJob {
public RestoreJob(String label, String backupTs, long dbId, String dbName,
BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion,
boolean reserveReplica,
- boolean reserveDynamicPartitionEnable, boolean isBeingSynced, Env
env, long repoId) {
+ boolean reserveDynamicPartitionEnable, boolean isBeingSynced,
boolean isCleanTables,
+ boolean isCleanPartitions, Env env, long repoId) {
super(JobType.RESTORE, label, dbId, dbName, timeoutMs, env, repoId);
this.backupTimestamp = backupTs;
this.jobInfo = jobInfo;
@@ -197,16 +208,21 @@ public class RestoreJob extends AbstractJob {
}
this.reserveDynamicPartitionEnable = reserveDynamicPartitionEnable;
this.isBeingSynced = isBeingSynced;
+ this.isCleanTables = isCleanTables;
+ this.isCleanPartitions = isCleanPartitions;
properties.put(PROP_RESERVE_REPLICA, String.valueOf(reserveReplica));
properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE,
String.valueOf(reserveDynamicPartitionEnable));
properties.put(PROP_IS_BEING_SYNCED, String.valueOf(isBeingSynced));
+ properties.put(PROP_CLEAN_TABLES, String.valueOf(isCleanTables));
+ properties.put(PROP_CLEAN_PARTITIONS,
String.valueOf(isCleanPartitions));
}
public RestoreJob(String label, String backupTs, long dbId, String dbName,
BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion,
boolean reserveReplica,
- boolean reserveDynamicPartitionEnable, boolean isBeingSynced, Env
env, long repoId, BackupMeta backupMeta) {
+ boolean reserveDynamicPartitionEnable, boolean isBeingSynced,
boolean isCleanTables,
+ boolean isCleanPartitions, Env env, long repoId, BackupMeta
backupMeta) {
this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc,
timeoutMs, metaVersion, reserveReplica,
- reserveDynamicPartitionEnable, isBeingSynced, env, repoId);
+ reserveDynamicPartitionEnable, isBeingSynced, isCleanTables,
isCleanPartitions, env, repoId);
this.backupMeta = backupMeta;
}
@@ -842,7 +858,9 @@ public class RestoreJob extends AbstractJob {
}
if (ok) {
- LOG.debug("finished to create all restored replcias. {}", this);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("finished to create all restored replicas. {}",
this);
+ }
// add restored partitions.
// table should be in State RESTORE, so no other partitions can be
// added to or removed from this table during the restore process.
@@ -1414,7 +1432,7 @@ public class RestoreJob extends AbstractJob {
return;
}
- Tablet tablet =
idx.getTablet(info.getTabletId());
+ Tablet tablet =
idx.getTablet(info.getTabletId());
if (tablet == null) {
status = new Status(ErrCode.NOT_FOUND,
"tablet " + info.getTabletId() + "
does not exist in restored table "
@@ -1557,7 +1575,7 @@ public class RestoreJob extends AbstractJob {
return;
}
- Tablet tablet =
idx.getTablet(info.getTabletId());
+ Tablet tablet =
idx.getTablet(info.getTabletId());
if (tablet == null) {
status = new Status(ErrCode.NOT_FOUND,
"tablet " + info.getTabletId() + "
does not exist in restored table "
@@ -1761,6 +1779,14 @@ public class RestoreJob extends AbstractJob {
}
}
+ // Drop the exists but non-restored table/partitions.
+ if (isCleanTables || isCleanPartitions) {
+ Status st = dropAllNonRestoredTableAndPartitions(db);
+ if (!st.ok()) {
+ return st;
+ }
+ }
+
if (!isReplay) {
restoredPartitions.clear();
restoredTbls.clear();
@@ -1783,6 +1809,59 @@ public class RestoreJob extends AbstractJob {
return Status.OK;
}
+ private Status dropAllNonRestoredTableAndPartitions(Database db) {
+ try {
+ for (Table table : db.getTables()) {
+ long tableId = table.getId();
+ String tableName = table.getName();
+ TableType tableType = table.getType();
+ BackupOlapTableInfo backupTableInfo =
jobInfo.backupOlapTableObjects.get(tableName);
+ if (tableType != TableType.OLAP && tableType != TableType.ODBC
&& tableType != TableType.VIEW) {
+ continue;
+ }
+ if (tableType == TableType.OLAP && backupTableInfo != null) {
+ // drop the non restored partitions.
+ dropNonRestoredPartitions(db, (OlapTable) table,
backupTableInfo);
+ } else if (isCleanTables) {
+ // otherwise drop the entire table.
+ LOG.info("drop non restored table {}({}). {}", tableName,
tableId, this);
+ boolean isForceDrop = false; // move this table into
recyclebin.
+ env.getInternalCatalog().dropTableWithoutCheck(db, table,
isForceDrop);
+ }
+ }
+ return Status.OK;
+ } catch (Exception e) {
+ LOG.warn("drop all non restored table and partitions failed. {}",
this, e);
+ return new Status(ErrCode.COMMON_ERROR, e.getMessage());
+ }
+ }
+
+ private void dropNonRestoredPartitions(
+ Database db, OlapTable table, BackupOlapTableInfo backupTableInfo)
throws DdlException {
+ if (!isCleanPartitions || !table.writeLockIfExist()) {
+ return;
+ }
+
+ try {
+ long tableId = table.getId();
+ String tableName = table.getQualifiedName();
+ InternalCatalog catalog = env.getInternalCatalog();
+ for (String partitionName : table.getPartitionNames()) {
+ if (backupTableInfo.containsPart(partitionName)) {
+ continue;
+ }
+
+ LOG.info("drop non restored partition {} of table {}({}). {}",
+ partitionName, tableName, tableId, this);
+ boolean isTempPartition = false;
+ boolean isForceDrop = false; // move this partition into
recyclebin.
+ catalog.dropPartitionWithoutCheck(db, table, partitionName,
isTempPartition, isForceDrop);
+ }
+ } finally {
+ table.writeUnlock();
+ }
+ }
+
private void releaseSnapshots() {
if (snapshotInfos.isEmpty()) {
return;
@@ -2184,6 +2263,8 @@ public class RestoreJob extends AbstractJob {
reserveReplica =
Boolean.parseBoolean(properties.get(PROP_RESERVE_REPLICA));
reserveDynamicPartitionEnable =
Boolean.parseBoolean(properties.get(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE));
isBeingSynced =
Boolean.parseBoolean(properties.get(PROP_IS_BEING_SYNCED));
+ isCleanTables =
Boolean.parseBoolean(properties.get(PROP_CLEAN_TABLES));
+ isCleanPartitions =
Boolean.parseBoolean(properties.get(PROP_CLEAN_PARTITIONS));
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
index a970d1798de..d8ed1b1ff06 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
@@ -148,8 +148,8 @@ public class Partition extends MetaObject implements
Writable {
public void updateVersionForRestore(long visibleVersion) {
this.setVisibleVersion(visibleVersion);
this.nextVersion = this.visibleVersion + 1;
- LOG.info("update partition {} version for restore: visible: {}, next:
{}",
- name, visibleVersion, nextVersion);
+ LOG.info("update partition {}({}) version for restore: visible: {},
next: {}",
+ name, id, visibleVersion, nextVersion);
}
public void updateVisibleVersion(long visibleVersion) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 60cda366e77..7a123e50fad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -898,35 +898,62 @@ public class InternalCatalog implements
CatalogIf<Database> {
+ " please use \"DROP table FORCE\".");
}
}
- table.writeLock();
- long recycleTime = 0;
- try {
- if (table instanceof OlapTable && !stmt.isForceDrop()) {
- OlapTable olapTable = (OlapTable) table;
- if ((olapTable.getState() != OlapTableState.NORMAL)) {
- throw new DdlException("The table [" + tableName +
"]'s state is " + olapTable.getState()
- + ", cannot be dropped." + " please cancel the
operation on olap table firstly."
- + " If you want to forcibly drop(cannot be
recovered),"
- + " please use \"DROP table FORCE\".");
- }
- }
- unprotectDropTable(db, table, stmt.isForceDrop(), false, 0);
- if (!stmt.isForceDrop()) {
- recycleTime =
Env.getCurrentRecycleBin().getRecycleTimeById(table.getId());
+
+ if (table instanceof OlapTable && !stmt.isForceDrop()) {
+ OlapTable olapTable = (OlapTable) table;
+ if ((olapTable.getState() != OlapTableState.NORMAL)) {
+ throw new DdlException("The table [" + tableName + "]'s
state is " + olapTable.getState()
+ + ", cannot be dropped." + " please cancel the
operation on olap table firstly."
+ + " If you want to forcibly drop(cannot be
recovered),"
+ + " please use \"DROP table FORCE\".");
}
- } finally {
- table.writeUnlock();
}
- DropInfo info = new DropInfo(db.getId(), table.getId(), tableName,
-1L, stmt.isForceDrop(), recycleTime);
- Env.getCurrentEnv().getEditLog().logDropTable(info);
-
Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(),
- db.getId(), table.getId());
+
+ dropTableInternal(db, table, stmt.isForceDrop());
+ } catch (UserException e) {
+ throw new DdlException(e.getMessage(), e.getMysqlErrorCode());
} finally {
db.writeUnlock();
}
LOG.info("finished dropping table: {} from db: {}, is force: {}",
tableName, dbName, stmt.isForceDrop());
}
+ // drop table without any check.
+ public void dropTableWithoutCheck(Database db, Table table, boolean
forceDrop) throws DdlException {
+ if (!db.writeLockIfExist()) {
+ return;
+ }
+ try {
+ LOG.info("drop table {} without check, force: {}",
table.getQualifiedName(), forceDrop);
+ dropTableInternal(db, table, forceDrop);
+ } catch (Exception e) {
+ LOG.warn("drop table without check", e);
+ throw e;
+ } finally {
+ db.writeUnlock();
+ }
+ }
+
+ // Drop a table, the db lock must hold.
+ private void dropTableInternal(Database db, Table table, boolean
forceDrop) throws DdlException {
+ table.writeLock();
+ String tableName = table.getName();
+ long recycleTime = 0;
+ try {
+ unprotectDropTable(db, table, forceDrop, false, 0);
+ if (!forceDrop) {
+ recycleTime =
Env.getCurrentRecycleBin().getRecycleTimeById(table.getId());
+ }
+ } finally {
+ table.writeUnlock();
+ }
+
+ DropInfo info = new DropInfo(db.getId(), table.getId(), tableName,
-1L, forceDrop, recycleTime);
+ Env.getCurrentEnv().getEditLog().logDropTable(info);
+
Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(),
+ db.getId(), table.getId());
+ }
+
public boolean unprotectDropTable(Database db, Table table, boolean
isForceDrop, boolean isReplay,
long recycleTime) {
if (table.getType() == TableType.ELASTICSEARCH) {
@@ -1755,6 +1782,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
String partitionName = clause.getPartitionName();
boolean isTempPartition = clause.isTempPartition();
+ boolean isForceDrop = clause.isForceDrop();
olapTable.checkNormalStateForAlter();
if (!olapTable.checkPartitionNameExist(partitionName,
isTempPartition)) {
@@ -1771,27 +1799,31 @@ public class InternalCatalog implements
CatalogIf<Database> {
throw new DdlException("Alter table [" + olapTable.getName() + "]
failed. Not a partitioned table");
}
- // drop
+ if (!isTempPartition && !isForceDrop) {
+ Partition partition = olapTable.getPartition(partitionName);
+ if (partition != null && Env.getCurrentGlobalTransactionMgr()
+ .existCommittedTxns(db.getId(), olapTable.getId(),
partition.getId())) {
+ throw new DdlException(
+ "There are still some transactions in the COMMITTED
state waiting to be completed."
+ + " The partition [" + partitionName
+ + "] cannot be dropped. If you want to
forcibly drop(cannot be recovered),"
+ + " please use \"DROP partition FORCE\".");
+ }
+ }
+
+ dropPartitionWithoutCheck(db, olapTable, partitionName,
isTempPartition, isForceDrop);
+ }
+
+ // drop partition without any check, the caller should hold the table
write lock.
+ public void dropPartitionWithoutCheck(Database db, OlapTable olapTable,
String partitionName,
+ boolean isTempPartition, boolean isForceDrop) throws DdlException {
Partition partition = null;
- long recycleTime = 0;
+ long recycleTime = -1;
if (isTempPartition) {
olapTable.dropTempPartition(partitionName, true);
} else {
- if (!clause.isForceDrop()) {
- partition = olapTable.getPartition(partitionName);
- if (partition != null) {
- if (Env.getCurrentEnv().getGlobalTransactionMgr()
- .existCommittedTxns(db.getId(), olapTable.getId(),
partition.getId())) {
- throw new DdlException(
- "There are still some transactions in the
COMMITTED state waiting to be completed."
- + " The partition [" + partitionName
- + "] cannot be dropped. If you want to
forcibly drop(cannot be recovered),"
- + " please use \"DROP partition
FORCE\".");
- }
- }
- }
- olapTable.dropPartition(db.getId(), partitionName,
clause.isForceDrop());
- if (!clause.isForceDrop() && partition != null) {
+ partition = olapTable.dropPartition(db.getId(), partitionName,
isForceDrop);
+ if (!isForceDrop && partition != null) {
recycleTime =
Env.getCurrentRecycleBin().getRecycleTimeById(partition.getId());
}
}
@@ -1799,11 +1831,11 @@ public class InternalCatalog implements
CatalogIf<Database> {
// log
long partitionId = partition == null ? -1L : partition.getId();
DropPartitionInfo info = new DropPartitionInfo(db.getId(),
olapTable.getId(), partitionId, partitionName,
- isTempPartition, clause.isForceDrop(), recycleTime);
+ isTempPartition, isForceDrop, recycleTime);
Env.getCurrentEnv().getEditLog().logDropPartition(info);
LOG.info("succeed in dropping partition[{}], table : [{}-{}], is temp
: {}, is force : {}",
- partitionName, olapTable.getId(), olapTable.getName(),
isTempPartition, clause.isForceDrop());
+ partitionName, olapTable.getId(), olapTable.getName(),
isTempPartition, isForceDrop);
}
public void replayDropPartition(DropPartitionInfo info) throws
MetaNotFoundException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index a38921221fd..cbbd49f69a8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2906,6 +2906,18 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
LabelName label = new LabelName(request.getDb(),
request.getLabelName());
String repoName = request.getRepoName();
Map<String, String> properties = request.getProperties();
+
+ // Restore requires that all properties are known, so the old version
of FE will not be able
+ // to recognize the properties of the new version. Therefore, request
parameters are used here
+ // instead of directly putting them in properties to avoid
compatibility issues of cross-version
+ // synchronization.
+ if (request.isCleanPartitions()) {
+ properties.put(RestoreStmt.PROP_CLEAN_PARTITIONS, "true");
+ }
+ if (request.isCleanTables()) {
+ properties.put(RestoreStmt.PROP_CLEAN_TABLES, "true");
+ }
+
AbstractBackupTableRefClause restoreTableRefClause = null;
if (request.isSetTableRefs()) {
List<TableRef> tableRefs = new ArrayList<>();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
index 5f426aa3311..839d11e8199 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
@@ -251,7 +251,7 @@ public class RestoreJobTest {
db.dropTable(expectedRestoreTbl.getName());
job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(),
db.getFullName(), jobInfo, false,
- new ReplicaAllocation((short) 3), 100000, -1, false, false,
false, env, repo.getId());
+ new ReplicaAllocation((short) 3), 100000, -1, false, false,
false, false, false, env, repo.getId());
List<Table> tbls = Lists.newArrayList();
List<Resource> resources = Lists.newArrayList();
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 75a6537ee85..9e43d35f8f1 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1102,6 +1102,8 @@ struct TRestoreSnapshotRequest {
10: optional map<string, string> properties
11: optional binary meta
12: optional binary job_info
+ 13: optional bool clean_tables
+ 14: optional bool clean_partitions
}
struct TRestoreSnapshotResult {
diff --git
a/regression-test/suites/backup_restore/test_backup_restore_clean_restore.groovy
b/regression-test/suites/backup_restore/test_backup_restore_clean_restore.groovy
new file mode 100644
index 00000000000..c80bc0d0060
--- /dev/null
+++
b/regression-test/suites/backup_restore/test_backup_restore_clean_restore.groovy
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_backup_restore_clean_restore", "backup_restore") {
+ String suiteName = "test_backup_restore_clean_restore"
+ String dbName = "${suiteName}_db"
+ String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "")
+ String snapshotName = "${suiteName}_snapshot"
+ String tableNamePrefix = "${suiteName}_tables"
+
+ def syncer = getSyncer()
+ syncer.createS3Repository(repoName)
+ sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+
+ String tableName1 = "${tableNamePrefix}_1"
+ sql "DROP TABLE IF EXISTS ${dbName}.${tableName1}"
+ sql """
+ CREATE TABLE ${dbName}.${tableName1} (
+ `id` LARGEINT NOT NULL,
+ `count` LARGEINT SUM DEFAULT "0"
+ )
+ AGGREGATE KEY(`id`)
+ PARTITION BY RANGE(`id`)
+ (
+ PARTITION `p1` VALUES LESS THAN ("0"),
+ PARTITION `p2` VALUES LESS THAN ("10"),
+ PARTITION `p3` VALUES LESS THAN ("20")
+ )
+ DISTRIBUTED BY HASH(`id`) BUCKETS 2
+ PROPERTIES
+ (
+ "replication_num" = "1"
+ )
+ """
+
+ def numRows = 20
+ List<String> values = []
+ for (int j = 0; j < numRows; ++j) {
+ values.add("(${j}, ${j})")
+ }
+ sql "INSERT INTO ${dbName}.${tableName1} VALUES ${values.join(",")}"
+ def result = sql "SELECT * FROM ${dbName}.${tableName1}"
+ assertEquals(result.size(), numRows);
+
+ String tableName2 = "${tableNamePrefix}_2"
+ sql "DROP TABLE IF EXISTS ${dbName}.${tableName2}"
+ sql """
+ CREATE TABLE ${dbName}.${tableName2} (
+ `id` LARGEINT NOT NULL,
+ `count` LARGEINT SUM DEFAULT "0"
+ )
+ AGGREGATE KEY(`id`)
+ PARTITION BY RANGE(`id`)
+ (
+ PARTITION `p1` VALUES LESS THAN ("0"),
+ PARTITION `p2` VALUES LESS THAN ("10"),
+ PARTITION `p3` VALUES LESS THAN ("20")
+ )
+ DISTRIBUTED BY HASH(`id`) BUCKETS 2
+ PROPERTIES
+ (
+ "replication_num" = "1"
+ )
+ """
+
+ sql "INSERT INTO ${dbName}.${tableName2} VALUES ${values.join(",")}"
+ result = sql "SELECT * FROM ${dbName}.${tableName2}"
+ assertEquals(result.size(), numRows);
+
+ String tableName3 = "${tableNamePrefix}_3"
+ sql "DROP TABLE IF EXISTS ${dbName}.${tableName3}"
+ sql """
+ CREATE TABLE ${dbName}.${tableName3} (
+ `id` LARGEINT NOT NULL,
+ `count` LARGEINT SUM DEFAULT "0"
+ )
+ AGGREGATE KEY(`id`)
+ PARTITION BY RANGE(`id`)
+ (
+ PARTITION `p1` VALUES LESS THAN ("0"),
+ PARTITION `p2` VALUES LESS THAN ("10"),
+ PARTITION `p3` VALUES LESS THAN ("20")
+ )
+ DISTRIBUTED BY HASH(`id`) BUCKETS 2
+ PROPERTIES
+ (
+ "replication_num" = "1"
+ )
+ """
+
+ sql "INSERT INTO ${dbName}.${tableName3} VALUES ${values.join(",")}"
+ result = sql "SELECT * FROM ${dbName}.${tableName3}"
+ assertEquals(result.size(), numRows);
+
+
+ sql """
+ BACKUP SNAPSHOT ${dbName}.${snapshotName}
+ TO `${repoName}`
+ """
+
+ while (!syncer.checkSnapshotFinish(dbName)) {
+ Thread.sleep(3000)
+ }
+
+ def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)
+ assertTrue(snapshot != null)
+
+ // restore table1, partition 3 of table2
+ sql """
+ RESTORE SNAPSHOT ${dbName}.${snapshotName}
+ FROM `${repoName}`
+ ON (
+ `${tableName1}`,
+ `${tableName2}` PARTITION (`p3`)
+ )
+ PROPERTIES
+ (
+ "backup_timestamp" = "${snapshot}",
+ "reserve_replica" = "true",
+ "clean_tables" = "true",
+ "clean_partitions" = "true"
+ )
+ """
+
+ while (!syncer.checkAllRestoreFinish(dbName)) {
+ Thread.sleep(3000)
+ }
+
+ // all data of table 1 must exists
+ result = sql "SELECT * FROM ${dbName}.${tableName1}"
+ assertEquals(result.size(), numRows);
+
+ // only data in p3 of table 2 exists
+ result = sql "SELECT * FROM ${dbName}.${tableName2}"
+ assertEquals(result.size(), numRows-10)
+
+ // table3 are dropped
+ result = sql """
+ SHOW TABLE STATUS FROM ${dbName} LIKE "${tableName3}"
+ """
+ assertEquals(result.size(), 0)
+
+ sql "DROP TABLE ${dbName}.${tableName1} FORCE"
+ sql "DROP TABLE ${dbName}.${tableName2} FORCE"
+ sql "DROP DATABASE ${dbName} FORCE"
+ sql "DROP REPOSITORY `${repoName}`"
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]