This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a269f4c0335 [fix](restore) Add synchronized to avoid concurrent
modification (#43172)
a269f4c0335 is described below
commit a269f4c0335584bc11ee795653a963652cef931b
Author: walter <[email protected]>
AuthorDate: Tue Nov 5 15:16:10 2024 +0800
[fix](restore) Add synchronized to avoid concurrent modification (#43172)
Without synchronized protection, when the user manually executes cancel,
the backupHandler may also trigger cancel simultaneously. Modifying the
snapshotInfo by both at the same time will cause a concurrent
modification exception.
The pending state will synchronously wait for the task of creating a
replica to end. Therefore, a creating state needs to be added to convert
the synchronous wait into asynchronous polling to avoid blocking the
user's cancel/list job info requests.
---
.../java/org/apache/doris/backup/BackupJob.java | 4 +-
.../java/org/apache/doris/backup/RestoreJob.java | 206 +++++++++++----------
.../test_backup_restore_atomic_with_alter.groovy | 2 +-
3 files changed, 107 insertions(+), 105 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index dbc7bb08fd4..ab7bfd8a03f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -100,7 +100,7 @@ public class BackupJob extends AbstractJob {
private List<TableRef> tableRefs = Lists.newArrayList();
@SerializedName("st")
- private BackupJobState state;
+ private volatile BackupJobState state;
@SerializedName("sft")
private long snapshotFinishedTime = -1;
@@ -1025,7 +1025,7 @@ public class BackupJob extends AbstractJob {
LOG.info("finished to cancel backup job. current state: {}. {}",
curState.name(), this);
}
- public List<String> getInfo() {
+ public synchronized List<String> getInfo() {
List<String> info = Lists.newArrayList();
info.add(String.valueOf(jobId));
info.add(label);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index b60c5489767..59582edb03e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -62,7 +62,6 @@ import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
-import org.apache.doris.common.util.DbUtil;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.PropertyAnalyzer;
@@ -132,7 +131,8 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
public enum RestoreJobState {
PENDING, // Job is newly created. Check and prepare meta in catalog.
Create replica if necessary.
// Waiting for replica creation finished synchronously, then
sending snapshot tasks.
- // then transfer to SNAPSHOTING
+ // then transfer to CREATING.
+ CREATING, // Creating replica on BE. Transfer to SNAPSHOTING after all
replicas created.
SNAPSHOTING, // Waiting for snapshot finished. Than transfer to
DOWNLOAD.
DOWNLOAD, // Send download tasks.
DOWNLOADING, // Waiting for download finished.
@@ -153,7 +153,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
private boolean allowLoad;
@SerializedName("st")
- private RestoreJobState state;
+ private volatile RestoreJobState state;
@SerializedName("meta")
private BackupMeta backupMeta;
@@ -215,6 +215,8 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
@SerializedName("prop")
private Map<String, String> properties = Maps.newHashMap();
+ private MarkedCountDownLatch<Long, Long> createReplicaTasksLatch = null;
+
public RestoreJob() {
super(JobType.RESTORE);
}
@@ -270,10 +272,6 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
return state;
}
- public RestoreFileMapping getFileMapping() {
- return fileMapping;
- }
-
public int getMetaVersion() {
return metaVersion;
}
@@ -421,7 +419,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
}
@Override
- public void run() {
+ public synchronized void run() {
if (state == RestoreJobState.FINISHED || state ==
RestoreJobState.CANCELLED) {
return;
}
@@ -447,8 +445,8 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
checkIfNeedCancel();
if (status.ok()) {
- if (state != RestoreJobState.PENDING && label.equals(
-
DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_NON_PENDING_RESTORE_JOB", "")))
{
+ if (state != RestoreJobState.PENDING && state !=
RestoreJobState.CREATING
+ &&
label.equals(DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_NON_PENDING_RESTORE_JOB",
""))) {
LOG.info("pause restore job by debug point: {}", this);
return;
}
@@ -457,6 +455,9 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
case PENDING:
checkAndPrepareMeta();
break;
+ case CREATING:
+ waitingAllReplicasCreated();
+ break;
case SNAPSHOTING:
waitingAllSnapshotsFinished();
break;
@@ -486,7 +487,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
* return true if some restored objs have been dropped.
*/
private void checkIfNeedCancel() {
- if (state == RestoreJobState.PENDING) {
+ if (state == RestoreJobState.PENDING || state ==
RestoreJobState.CREATING) {
return;
}
@@ -956,121 +957,122 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
}
// Send create replica task to BE outside the db lock
- boolean ok = false;
int numBatchTasks = batchTaskPerTable.values()
.stream()
.mapToInt(AgentBatchTask::getTaskNum)
.sum();
- MarkedCountDownLatch<Long, Long> latch = new
MarkedCountDownLatch<Long, Long>(numBatchTasks);
- if (batchTaskPerTable.size() > 0) {
+ createReplicaTasksLatch = new MarkedCountDownLatch<>(numBatchTasks);
+ if (numBatchTasks > 0) {
+ LOG.info("begin to send create replica tasks to BE for restore.
total {} tasks. {}", numBatchTasks, this);
for (AgentBatchTask batchTask : batchTaskPerTable.values()) {
for (AgentTask task : batchTask.getAllTasks()) {
- latch.addMark(task.getBackendId(), task.getTabletId());
- ((CreateReplicaTask) task).setLatch(latch);
+ createReplicaTasksLatch.addMark(task.getBackendId(),
task.getTabletId());
+ ((CreateReplicaTask)
task).setLatch(createReplicaTasksLatch);
AgentTaskQueue.addTask(task);
}
AgentTaskExecutor.submit(batchTask);
}
-
- // estimate timeout
- long timeout = DbUtil.getCreateReplicasTimeoutMs(numBatchTasks) /
1000;
- try {
- LOG.info("begin to send create replica tasks to BE for
restore. total {} tasks. timeout: {}s",
- numBatchTasks, timeout);
- for (long elapsed = 0; elapsed <= timeout; elapsed++) {
- if (latch.await(1, TimeUnit.SECONDS)) {
- ok = true;
- break;
- }
- if (state != RestoreJobState.PENDING) { // user cancelled
- return;
- }
- if (elapsed % 5 == 0) {
- LOG.info("waiting {} create replica tasks for restore
to finish, total {} tasks, elapsed {}s",
- latch.getCount(), numBatchTasks, elapsed);
- }
- }
- } catch (InterruptedException e) {
- LOG.warn("InterruptedException: ", e);
- ok = false;
- }
- } else {
- ok = true;
}
- if (ok && latch.getStatus().ok()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("finished to create all restored replicas. {}",
this);
- }
- // add restored partitions.
- // table should be in State RESTORE, so no other partitions can be
- // added to or removed from this table during the restore process.
- for (Pair<String, Partition> entry : restoredPartitions) {
- OlapTable localTbl = (OlapTable)
db.getTableNullable(entry.first);
- localTbl.writeLock();
- try {
- Partition restoredPart = entry.second;
- OlapTable remoteTbl = (OlapTable)
backupMeta.getTable(entry.first);
- if (localTbl.getPartitionInfo().getType() ==
PartitionType.RANGE
- || localTbl.getPartitionInfo().getType() ==
PartitionType.LIST) {
-
- PartitionInfo remotePartitionInfo =
remoteTbl.getPartitionInfo();
- PartitionInfo localPartitionInfo =
localTbl.getPartitionInfo();
- BackupPartitionInfo backupPartitionInfo
- =
jobInfo.getOlapTableInfo(entry.first).getPartInfo(restoredPart.getName());
- long remotePartId = backupPartitionInfo.id;
- PartitionItem remoteItem =
remoteTbl.getPartitionInfo().getItem(remotePartId);
- DataProperty remoteDataProperty =
remotePartitionInfo.getDataProperty(remotePartId);
- ReplicaAllocation restoreReplicaAlloc = replicaAlloc;
- if (reserveReplica) {
- restoreReplicaAlloc =
remotePartitionInfo.getReplicaAllocation(remotePartId);
- }
- localPartitionInfo.addPartition(restoredPart.getId(),
false, remoteItem,
- remoteDataProperty, restoreReplicaAlloc,
-
remotePartitionInfo.getIsInMemory(remotePartId),
-
remotePartitionInfo.getIsMutable(remotePartId));
- }
- localTbl.addPartition(restoredPart);
- } finally {
- localTbl.writeUnlock();
- }
+ // No log here, PENDING state restore job will redo this method
+ state = RestoreJobState.CREATING;
+ }
+ private void waitingAllReplicasCreated() {
+ boolean ok = true;
+ try {
+ if (!createReplicaTasksLatch.await(0, TimeUnit.SECONDS)) {
+ LOG.info("waiting {} create replica tasks for restore to
finish. {}",
+ createReplicaTasksLatch.getCount(), this);
+ return;
}
+ } catch (InterruptedException e) {
+ LOG.warn("InterruptedException, {}", this, e);
+ ok = false;
+ }
- // add restored tables
- for (Table tbl : restoredTbls) {
- if (!db.writeLockIfExist()) {
- status = new Status(ErrCode.COMMON_ERROR, "Database " +
db.getFullName()
- + " has been dropped");
- return;
- }
- tbl.writeLock();
- try {
- if (!db.registerTable(tbl)) {
- status = new Status(ErrCode.COMMON_ERROR, "Table " +
tbl.getName()
- + " already exist in db: " + db.getFullName());
- return;
- }
- } finally {
- tbl.writeUnlock();
- db.writeUnlock();
- }
- }
- } else {
+ if (!(ok && createReplicaTasksLatch.getStatus().ok())) {
// only show at most 10 results
- List<String> subList = latch.getLeftMarks().stream().limit(10)
+ List<String> subList =
createReplicaTasksLatch.getLeftMarks().stream().limit(10)
.map(item -> "(backendId = " + item.getKey() + ", tabletId
= " + item.getValue() + ")")
.collect(Collectors.toList());
String idStr = Joiner.on(", ").join(subList);
String reason = "TIMEDOUT";
- if (!latch.getStatus().ok()) {
- reason = latch.getStatus().getErrorMsg();
+ if (!createReplicaTasksLatch.getStatus().ok()) {
+ reason = createReplicaTasksLatch.getStatus().getErrorMsg();
}
String errMsg = String.format(
"Failed to create replicas for restore: %s, unfinished
marks: %s", reason, idStr);
status = new Status(ErrCode.COMMON_ERROR, errMsg);
return;
}
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("finished to create all restored replicas. {}", this);
+ }
+ allReplicasCreated();
+ }
+
+ private void allReplicasCreated() {
+ Database db = env.getInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ status = new Status(ErrCode.NOT_FOUND, "database " + dbId + " does
not exist");
+ return;
+ }
+
+ // add restored partitions.
+ // table should be in State RESTORE, so no other partitions can be
+ // added to or removed from this table during the restore process.
+ for (Pair<String, Partition> entry : restoredPartitions) {
+ OlapTable localTbl = (OlapTable) db.getTableNullable(entry.first);
+ localTbl.writeLock();
+ try {
+ Partition restoredPart = entry.second;
+ OlapTable remoteTbl = (OlapTable)
backupMeta.getTable(entry.first);
+ if (localTbl.getPartitionInfo().getType() ==
PartitionType.RANGE
+ || localTbl.getPartitionInfo().getType() ==
PartitionType.LIST) {
+
+ PartitionInfo remotePartitionInfo =
remoteTbl.getPartitionInfo();
+ PartitionInfo localPartitionInfo =
localTbl.getPartitionInfo();
+ BackupPartitionInfo backupPartitionInfo
+ =
jobInfo.getOlapTableInfo(entry.first).getPartInfo(restoredPart.getName());
+ long remotePartId = backupPartitionInfo.id;
+ PartitionItem remoteItem =
remoteTbl.getPartitionInfo().getItem(remotePartId);
+ DataProperty remoteDataProperty =
remotePartitionInfo.getDataProperty(remotePartId);
+ ReplicaAllocation restoreReplicaAlloc = replicaAlloc;
+ if (reserveReplica) {
+ restoreReplicaAlloc =
remotePartitionInfo.getReplicaAllocation(remotePartId);
+ }
+ localPartitionInfo.addPartition(restoredPart.getId(),
false, remoteItem,
+ remoteDataProperty, restoreReplicaAlloc,
+ remotePartitionInfo.getIsInMemory(remotePartId),
+ remotePartitionInfo.getIsMutable(remotePartId));
+ }
+ localTbl.addPartition(restoredPart);
+ } finally {
+ localTbl.writeUnlock();
+ }
+ }
+
+ // add restored tables
+ for (Table tbl : restoredTbls) {
+ if (!db.writeLockIfExist()) {
+ status = new Status(ErrCode.COMMON_ERROR, "Database " +
db.getFullName() + " has been dropped");
+ return;
+ }
+ tbl.writeLock();
+ try {
+ if (!db.registerTable(tbl)) {
+ status = new Status(ErrCode.COMMON_ERROR, "Table " +
tbl.getName()
+ + " already exist in db: " + db.getFullName());
+ return;
+ }
+ } finally {
+ tbl.writeUnlock();
+ db.writeUnlock();
+ }
+ }
+
LOG.info("finished to prepare meta. {}", this);
if (jobInfo.content == null || jobInfo.content == BackupContent.ALL) {
@@ -2213,7 +2215,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
return getInfo(false);
}
- public List<String> getInfo(boolean isBrief) {
+ public synchronized List<String> getInfo(boolean isBrief) {
List<String> info = Lists.newArrayList();
info.add(String.valueOf(jobId));
info.add(label);
@@ -2272,7 +2274,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
return Status.OK;
}
- public void cancelInternal(boolean isReplay) {
+ private void cancelInternal(boolean isReplay) {
// We need to clean the residual due to current state
if (!isReplay) {
switch (state) {
diff --git
a/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy
b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy
index 46a3ca5b29d..b66ff391f15 100644
---
a/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy
+++
b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy
@@ -112,7 +112,7 @@ suite("test_backup_restore_atomic_with_alter",
"backup_restore") {
boolean restore_paused = false
for (int k = 0; k < 60; k++) {
def records = sql_return_maparray """ SHOW RESTORE FROM ${dbName}
WHERE Label = "${snapshotName}" """
- if (records.size() == 1 && records[0].State != 'PENDING') {
+ if (records.size() == 1 && (records[0].State != 'PENDING' &&
records[0].State != 'CREATING')) {
restore_paused = true
break
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]