This is an automated email from the ASF dual-hosted git repository.
xuyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d11bb11592b [enhancement](backup/restore) support alter s3 repo info
about ak/sk/token (#27027)
d11bb11592b is described below
commit d11bb11592b3377c7a6aef337af30154cb2119f9
Author: xueweizhang <[email protected]>
AuthorDate: Thu Dec 21 13:21:55 2023 +0800
[enhancement](backup/restore) support alter s3 repo info about ak/sk/token
(#27027)
in some cases:
s3.session_token/AWS_TOKEN will be expired after hours, and may be upload
snapshot job will failed if data is big large;
as same reason, repo will be expired too when RepositoryMgr exec repo ping
so it need support alter s3 repo properties about ak/sk/token and update
upload snapshot job properties to continue backup.
Signed-off-by: nextdreamblue <[email protected]>
---
fe/fe-core/src/main/cup/sql_parser.cup | 4 +
.../apache/doris/analysis/AlterRepositoryStmt.java | 86 ++++++++++++++++++++++
.../java/org/apache/doris/backup/AbstractJob.java | 2 +
.../org/apache/doris/backup/BackupHandler.java | 45 +++++++++++
.../java/org/apache/doris/backup/BackupJob.java | 22 ++++++
.../java/org/apache/doris/backup/Repository.java | 35 +++++++++
.../org/apache/doris/backup/RepositoryMgr.java | 26 +++++++
.../java/org/apache/doris/backup/RestoreJob.java | 22 ++++++
.../org/apache/doris/journal/JournalEntity.java | 5 ++
.../java/org/apache/doris/persist/EditLog.java | 9 +++
.../org/apache/doris/persist/OperationType.java | 2 +
.../main/java/org/apache/doris/qe/DdlExecutor.java | 3 +
.../java/org/apache/doris/task/AgentTaskQueue.java | 12 +++
.../java/org/apache/doris/task/DownloadTask.java | 4 +
.../java/org/apache/doris/task/UploadTask.java | 5 ++
15 files changed, 282 insertions(+)
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index 838f6d7a757..26b221c128c 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -1413,6 +1413,10 @@ alter_stmt ::=
{:
RESULT = new AlterUserStmt(ifExists, user, null, passwdOptions);
:}
+ | KW_ALTER KW_REPOSITORY ident:repoName opt_properties:properties
+ {:
+ RESULT = new AlterRepositoryStmt(repoName, properties);
+ :}
;
opt_datasource_properties ::=
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRepositoryStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRepositoryStmt.java
new file mode 100644
index 00000000000..b0ecd6d3bd4
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRepositoryStmt.java
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.FeNameFormat;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.PrintableMap;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Maps;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AlterRepositoryStmt extends DdlStmt {
+ private String name;
+ private Map<String, String> properties;
+
+ public AlterRepositoryStmt(String name, Map<String, String> properties) {
+ this.name = name;
+ this.properties = properties;
+ if (this.properties == null) {
+ this.properties = Maps.newHashMap();
+ }
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws UserException {
+ super.analyze(analyzer);
+ // check auth
+ if
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN");
+ }
+ FeNameFormat.checkCommonName("repository", name);
+ Map<String, String> copyProperties = new HashMap<>(properties);
+ if (copyProperties.size() == 0) {
+ throw new UserException("alter repository need contains
ak/sk/token info of s3.");
+ }
+ copyProperties.remove(S3Properties.ACCESS_KEY);
+ copyProperties.remove(S3Properties.SECRET_KEY);
+ copyProperties.remove(S3Properties.SESSION_TOKEN);
+ copyProperties.remove(S3Properties.Env.ACCESS_KEY);
+ copyProperties.remove(S3Properties.Env.SECRET_KEY);
+ copyProperties.remove(S3Properties.Env.TOKEN);
+ if (copyProperties.size() != 0) {
+ throw new UserException("alter repository only support ak/sk/token
info of s3."
+ + " unsupported properties: " + copyProperties.keySet());
+ }
+ }
+
+ @Override
+ public String toSql() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("ALTER REPOSITORY '").append(name).append("' ");
+ sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ",
true, false)).append(")");
+ return sb.toString();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
index d2f1dd11979..0df9155ab34 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
@@ -155,6 +155,8 @@ public abstract class AbstractJob implements Writable {
public abstract boolean isCancelled();
+ public abstract Status updateRepo(Repository repo);
+
public static AbstractJob read(DataInput in) throws IOException {
AbstractJob job = null;
JobType type = JobType.valueOf(Text.readString(in));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index 5279175fa00..7c0485228f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -19,6 +19,7 @@ package org.apache.doris.backup;
import org.apache.doris.analysis.AbstractBackupStmt;
import org.apache.doris.analysis.AbstractBackupTableRefClause;
+import org.apache.doris.analysis.AlterRepositoryStmt;
import org.apache.doris.analysis.BackupStmt;
import org.apache.doris.analysis.BackupStmt.BackupType;
import org.apache.doris.analysis.CancelBackupStmt;
@@ -49,6 +50,7 @@ import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.RemoteFileSystem;
+import org.apache.doris.fs.remote.S3FileSystem;
import org.apache.doris.task.DirMoveTask;
import org.apache.doris.task.DownloadTask;
import org.apache.doris.task.SnapshotTask;
@@ -222,6 +224,49 @@ public class BackupHandler extends MasterDaemon implements
Writable {
}
}
+ public void alterRepository(AlterRepositoryStmt stmt) throws DdlException {
+ tryLock();
+ try {
+ Repository repo = repoMgr.getRepo(stmt.getName());
+ if (repo == null) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Repository does not exist");
+ }
+
+ if (repo.getRemoteFileSystem() instanceof S3FileSystem) {
+ Map<String, String> oldProperties = new
HashMap<>(stmt.getProperties());
+ Status status =
repo.alterRepositoryS3Properties(oldProperties);
+ if (!status.ok()) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
status.getErrMsg());
+ }
+ RemoteFileSystem fileSystem =
FileSystemFactory.get(repo.getRemoteFileSystem().getName(),
+ StorageBackend.StorageType.S3, oldProperties);
+ Repository newRepo = new Repository(repo.getId(),
repo.getName(), repo.isReadOnly(),
+ repo.getLocation(), fileSystem);
+ if (!newRepo.ping()) {
+ LOG.warn("Failed to connect repository {}. msg: {}",
repo.getName(), repo.getErrorMsg());
+ ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
+ "Repo can not ping with new s3 properties");
+ }
+
+ Status st = repoMgr.alterRepo(newRepo, false /* not replay */);
+ if (!st.ok()) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
+ "Failed to alter repository: " + st.getErrMsg());
+ }
+ for (AbstractJob job : getAllCurrentJobs()) {
+ if (!job.isDone() && job.getRepoId() == repo.getId()) {
+ job.updateRepo(newRepo);
+ }
+ }
+ } else {
+ ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
+ "Only support alter s3
repository");
+ }
+ } finally {
+ seqlock.unlock();
+ }
+ }
+
// handle drop repository stmt
public void dropRepository(DropRepositoryStmt stmt) throws DdlException {
tryLock();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index d764b926e79..bed797b9167 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -273,6 +273,28 @@ public class BackupJob extends AbstractJob {
return state == BackupJobState.CANCELLED;
}
+ @Override
+ public synchronized Status updateRepo(Repository repo) {
+ this.repo = repo;
+
+ if (this.state == BackupJobState.UPLOADING) {
+ for (Map.Entry<Long, Long> entry : unfinishedTaskIds.entrySet()) {
+ long signature = entry.getKey();
+ long beId = entry.getValue();
+ AgentTask task = AgentTaskQueue.getTask(beId,
TTaskType.UPLOAD, signature);
+ if (task == null || task.getTaskType() != TTaskType.UPLOAD) {
+ continue;
+ }
+ ((UploadTask) task).updateBrokerProperties(
+
S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()));
+ AgentTaskQueue.updateTask(beId, TTaskType.UPLOAD, signature,
task);
+ }
+ LOG.info("finished to update upload job properties. {}", this);
+ }
+ LOG.info("finished to update repo of job. {}", this);
+ return Status.OK;
+ }
+
// Polling the job state and do the right things.
@Override
public synchronized void run() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
index 58dff0d7324..3c5fdd3578e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
@@ -29,6 +29,7 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.PersistentFileSystem;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFile;
@@ -58,7 +59,10 @@ import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
/*
@@ -295,6 +299,37 @@ public class Repository implements Writable {
}
}
+ public Status alterRepositoryS3Properties(Map<String, String> properties) {
+ if (fileSystem instanceof S3FileSystem) {
+ Map<String, String> oldProperties = new
HashMap<>(this.getRemoteFileSystem().getProperties());
+ oldProperties.remove(S3Properties.ACCESS_KEY);
+ oldProperties.remove(S3Properties.SECRET_KEY);
+ oldProperties.remove(S3Properties.SESSION_TOKEN);
+ oldProperties.remove(S3Properties.Env.ACCESS_KEY);
+ oldProperties.remove(S3Properties.Env.SECRET_KEY);
+ oldProperties.remove(S3Properties.Env.TOKEN);
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ if (Objects.equals(entry.getKey(), S3Properties.ACCESS_KEY)
+ || Objects.equals(entry.getKey(),
S3Properties.Env.ACCESS_KEY)) {
+ oldProperties.putIfAbsent(S3Properties.ACCESS_KEY,
entry.getValue());
+ }
+ if (Objects.equals(entry.getKey(), S3Properties.SECRET_KEY)
+ || Objects.equals(entry.getKey(),
S3Properties.Env.SECRET_KEY)) {
+ oldProperties.putIfAbsent(S3Properties.SECRET_KEY,
entry.getValue());
+ }
+ if (Objects.equals(entry.getKey(), S3Properties.SESSION_TOKEN)
+ || Objects.equals(entry.getKey(),
S3Properties.Env.TOKEN)) {
+ oldProperties.putIfAbsent(S3Properties.SESSION_TOKEN,
entry.getValue());
+ }
+ }
+ properties.clear();
+ properties.putAll(oldProperties);
+ return Status.OK;
+ } else {
+ return new Status(ErrCode.COMMON_ERROR, "Only support alter s3
repository");
+ }
+ }
+
// eg: location/__palo_repository_repo_name/__repo_info
public String assembleRepoInfoFilePath() {
return Joiner.on(PATH_DELIMITER).join(location,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java
index 3866c55331c..e923ba4e044 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java
@@ -21,6 +21,7 @@ import org.apache.doris.backup.Status.ErrCode;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.Daemon;
+import org.apache.doris.fs.remote.S3FileSystem;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -96,6 +97,31 @@ public class RepositoryMgr extends Daemon implements
Writable {
return repoIdMap.get(repoId);
}
+ public Status alterRepo(Repository newRepo, boolean isReplay) {
+ lock.lock();
+ try {
+ Repository repo = repoNameMap.get(newRepo.getName());
+ if (repo != null) {
+ if (repo.getRemoteFileSystem() instanceof S3FileSystem) {
+ repoNameMap.put(repo.getName(), newRepo);
+ repoIdMap.put(repo.getId(), newRepo);
+
+ if (!isReplay) {
+ // log
+
Env.getCurrentEnv().getEditLog().logAlterRepository(newRepo);
+ }
+ LOG.info("successfully alter repo {}, isReplay {}",
newRepo.getName(), isReplay);
+ return Status.OK;
+ } else {
+ return new Status(ErrCode.COMMON_ERROR, "Only support
alter s3 repository");
+ }
+ }
+ return new Status(ErrCode.NOT_FOUND, "repository does not exist");
+ } finally {
+ lock.unlock();
+ }
+ }
+
public Status removeRepo(String repoName, boolean isReplay) {
lock.lock();
try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index d0e7788a380..6d4f68e985d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -338,6 +338,28 @@ public class RestoreJob extends AbstractJob {
return state == RestoreJobState.CANCELLED;
}
+ @Override
+ public synchronized Status updateRepo(Repository repo) {
+ this.repo = repo;
+
+ if (this.state == RestoreJobState.DOWNLOADING) {
+ for (Map.Entry<Long, Long> entry :
unfinishedSignatureToId.entrySet()) {
+ long signature = entry.getKey();
+ long beId = entry.getValue();
+ AgentTask task = AgentTaskQueue.getTask(beId,
TTaskType.DOWNLOAD, signature);
+ if (task == null || task.getTaskType() != TTaskType.DOWNLOAD) {
+ continue;
+ }
+ ((DownloadTask) task).updateBrokerProperties(
+
S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()));
+ AgentTaskQueue.updateTask(beId, TTaskType.DOWNLOAD, signature,
task);
+ }
+ LOG.info("finished to update download job properties. {}", this);
+ }
+ LOG.info("finished to update repo of job. {}", this);
+ return Status.OK;
+ }
+
@Override
public void run() {
if (state == RestoreJobState.FINISHED || state ==
RestoreJobState.CANCELLED) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 9b1d32d0642..aae8f4e4e7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -888,6 +888,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
+ case OperationType.OP_ALTER_REPOSITORY: {
+ data = Repository.read(in);
+ isRead = true;
+ break;
+ }
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index ba0ebc35c56..ea8292712af 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -1133,6 +1133,11 @@ public class EditLog {
env.getAlterInstance().processAlterMTMV(alterMtmv, true);
break;
}
+ case OperationType.OP_ALTER_REPOSITORY: {
+ Repository repository = (Repository) journal.getData();
+ env.getBackupHandler().getRepoMgr().alterRepo(repository,
true);
+ break;
+ }
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
@@ -1544,6 +1549,10 @@ public class EditLog {
logEdit(OperationType.OP_DROP_REPOSITORY, new Text(repoName));
}
+ public void logAlterRepository(Repository repo) {
+ logEdit(OperationType.OP_ALTER_REPOSITORY, repo);
+ }
+
public void logRestoreJob(RestoreJob job) {
logEdit(OperationType.OP_RESTORE_JOB, job);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index ada6adf53d2..87b6b5bacfa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -351,6 +351,8 @@ public class OperationType {
public static final short OP_ALTER_MTMV = 459;
+ public static final short OP_ALTER_REPOSITORY = 460;
+
/**
* Get opcode name by op code.
**/
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 7a47dbcb541..47682cd74f0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -39,6 +39,7 @@ import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
import org.apache.doris.analysis.AlterDatabaseRename;
import org.apache.doris.analysis.AlterJobStatusStmt;
import org.apache.doris.analysis.AlterPolicyStmt;
+import org.apache.doris.analysis.AlterRepositoryStmt;
import org.apache.doris.analysis.AlterResourceStmt;
import org.apache.doris.analysis.AlterRoutineLoadStmt;
import org.apache.doris.analysis.AlterSqlBlockRuleStmt;
@@ -388,6 +389,8 @@ public class DdlExecutor {
} else if (ddlStmt instanceof DropAnalyzeJobStmt) {
DropAnalyzeJobStmt analyzeJobStmt = (DropAnalyzeJobStmt) ddlStmt;
Env.getCurrentEnv().getAnalysisManager().dropAnalyzeJob(analyzeJobStmt);
+ } else if (ddlStmt instanceof AlterRepositoryStmt) {
+ env.getBackupHandler().alterRepository((AlterRepositoryStmt)
ddlStmt);
} else {
LOG.warn("Unkown statement " + ddlStmt.getClass());
throw new DdlException("Unknown statement.");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
index 1677701272c..c22f0b2ee36 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
@@ -135,6 +135,18 @@ public class AgentTaskQueue {
return signatureMap.get(signature);
}
+ public static synchronized void updateTask(long backendId, TTaskType type,
long signature, AgentTask newTask) {
+ if (!tasks.contains(backendId, type)) {
+ return;
+ }
+
+ Map<Long, AgentTask> signatureMap = tasks.get(backendId, type);
+ if (!signatureMap.containsKey(signature)) {
+ return;
+ }
+ signatureMap.put(signature, newTask);
+ }
+
// this is just for unit test
public static synchronized List<AgentTask> getTask(TTaskType type) {
List<AgentTask> res = Lists.newArrayList();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java
index 6482c5f807b..5dbbc46b7b2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java
@@ -78,6 +78,10 @@ public class DownloadTask extends AgentTask {
return brokerProperties;
}
+ public void updateBrokerProperties(Map<String, String> brokerProperties) {
+ this.brokerProperties = new java.util.HashMap<>(brokerProperties);
+ }
+
public TDownloadReq toThrift() {
// these fields are required
// 1: required i64 job_id
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/UploadTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/UploadTask.java
index f41d722825a..b2a4ae489ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/UploadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/UploadTask.java
@@ -24,6 +24,7 @@ import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.thrift.TUploadReq;
+import java.util.HashMap;
import java.util.Map;
public class UploadTask extends AgentTask {
@@ -64,6 +65,10 @@ public class UploadTask extends AgentTask {
return brokerProperties;
}
+ public void updateBrokerProperties(Map<String, String> brokerProperties) {
+ this.brokerProperties = new HashMap<>(brokerProperties);
+ }
+
public TUploadReq toThrift() {
TNetworkAddress address = new TNetworkAddress(broker.host,
broker.port);
TUploadReq request = new TUploadReq(jobId, srcToDestPath, address);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]