This is an automated email from the ASF dual-hosted git repository.

xuyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d11bb11592b [enhancement](backup/restore) support alter s3 repo info 
about ak/sk/token (#27027)
d11bb11592b is described below

commit d11bb11592b3377c7a6aef337af30154cb2119f9
Author: xueweizhang <[email protected]>
AuthorDate: Thu Dec 21 13:21:55 2023 +0800

    [enhancement](backup/restore) support alter s3 repo info about ak/sk/token 
(#27027)
    
    in some cases:
    
    s3.session_token/AWS_TOKEN will be expired after hours, and may be upload 
snapshot job will failed if data is big large;
    as same reason, repo will be expired too when RepositoryMgr exec repo ping
    so it need support alter s3 repo properties about ak/sk/token and update 
upload snapshot job properties to continue backup.
    
    Signed-off-by: nextdreamblue <[email protected]>
---
 fe/fe-core/src/main/cup/sql_parser.cup             |  4 +
 .../apache/doris/analysis/AlterRepositoryStmt.java | 86 ++++++++++++++++++++++
 .../java/org/apache/doris/backup/AbstractJob.java  |  2 +
 .../org/apache/doris/backup/BackupHandler.java     | 45 +++++++++++
 .../java/org/apache/doris/backup/BackupJob.java    | 22 ++++++
 .../java/org/apache/doris/backup/Repository.java   | 35 +++++++++
 .../org/apache/doris/backup/RepositoryMgr.java     | 26 +++++++
 .../java/org/apache/doris/backup/RestoreJob.java   | 22 ++++++
 .../org/apache/doris/journal/JournalEntity.java    |  5 ++
 .../java/org/apache/doris/persist/EditLog.java     |  9 +++
 .../org/apache/doris/persist/OperationType.java    |  2 +
 .../main/java/org/apache/doris/qe/DdlExecutor.java |  3 +
 .../java/org/apache/doris/task/AgentTaskQueue.java | 12 +++
 .../java/org/apache/doris/task/DownloadTask.java   |  4 +
 .../java/org/apache/doris/task/UploadTask.java     |  5 ++
 15 files changed, 282 insertions(+)

diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index 838f6d7a757..26b221c128c 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -1413,6 +1413,10 @@ alter_stmt ::=
     {:
         RESULT = new AlterUserStmt(ifExists, user, null, passwdOptions);
     :}
+    | KW_ALTER KW_REPOSITORY ident:repoName opt_properties:properties
+    {:
+        RESULT = new AlterRepositoryStmt(repoName, properties);
+    :}
     ;
 
 opt_datasource_properties ::=
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRepositoryStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRepositoryStmt.java
new file mode 100644
index 00000000000..b0ecd6d3bd4
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRepositoryStmt.java
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.FeNameFormat;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.PrintableMap;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Maps;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AlterRepositoryStmt extends DdlStmt {
+    private String name;
+    private Map<String, String> properties;
+
+    public AlterRepositoryStmt(String name, Map<String, String> properties) {
+        this.name = name;
+        this.properties = properties;
+        if (this.properties == null) {
+            this.properties = Maps.newHashMap();
+        }
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws UserException {
+        super.analyze(analyzer);
+        // check auth
+        if 
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"ADMIN");
+        }
+        FeNameFormat.checkCommonName("repository", name);
+        Map<String, String> copyProperties = new HashMap<>(properties);
+        if (copyProperties.size() == 0) {
+            throw new UserException("alter repository need contains 
ak/sk/token info of s3.");
+        }
+        copyProperties.remove(S3Properties.ACCESS_KEY);
+        copyProperties.remove(S3Properties.SECRET_KEY);
+        copyProperties.remove(S3Properties.SESSION_TOKEN);
+        copyProperties.remove(S3Properties.Env.ACCESS_KEY);
+        copyProperties.remove(S3Properties.Env.SECRET_KEY);
+        copyProperties.remove(S3Properties.Env.TOKEN);
+        if (copyProperties.size() != 0) {
+            throw new UserException("alter repository only support ak/sk/token 
info of s3."
+                + " unsupported properties: " + copyProperties.keySet());
+        }
+    }
+
+    @Override
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("ALTER REPOSITORY '").append(name).append("' ");
+        sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ", 
true, false)).append(")");
+        return sb.toString();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
index d2f1dd11979..0df9155ab34 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
@@ -155,6 +155,8 @@ public abstract class AbstractJob implements Writable {
 
     public abstract boolean isCancelled();
 
+    public abstract Status updateRepo(Repository repo);
+
     public static AbstractJob read(DataInput in) throws IOException {
         AbstractJob job = null;
         JobType type = JobType.valueOf(Text.readString(in));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index 5279175fa00..7c0485228f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -19,6 +19,7 @@ package org.apache.doris.backup;
 
 import org.apache.doris.analysis.AbstractBackupStmt;
 import org.apache.doris.analysis.AbstractBackupTableRefClause;
+import org.apache.doris.analysis.AlterRepositoryStmt;
 import org.apache.doris.analysis.BackupStmt;
 import org.apache.doris.analysis.BackupStmt.BackupType;
 import org.apache.doris.analysis.CancelBackupStmt;
@@ -49,6 +50,7 @@ import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.fs.FileSystemFactory;
 import org.apache.doris.fs.remote.RemoteFileSystem;
+import org.apache.doris.fs.remote.S3FileSystem;
 import org.apache.doris.task.DirMoveTask;
 import org.apache.doris.task.DownloadTask;
 import org.apache.doris.task.SnapshotTask;
@@ -222,6 +224,49 @@ public class BackupHandler extends MasterDaemon implements 
Writable {
         }
     }
 
+    public void alterRepository(AlterRepositoryStmt stmt) throws DdlException {
+        tryLock();
+        try {
+            Repository repo = repoMgr.getRepo(stmt.getName());
+            if (repo == null) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, 
"Repository does not exist");
+            }
+
+            if (repo.getRemoteFileSystem() instanceof S3FileSystem) {
+                Map<String, String> oldProperties = new 
HashMap<>(stmt.getProperties());
+                Status status = 
repo.alterRepositoryS3Properties(oldProperties);
+                if (!status.ok()) {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, 
status.getErrMsg());
+                }
+                RemoteFileSystem fileSystem = 
FileSystemFactory.get(repo.getRemoteFileSystem().getName(),
+                        StorageBackend.StorageType.S3, oldProperties);
+                Repository newRepo = new Repository(repo.getId(), 
repo.getName(), repo.isReadOnly(),
+                        repo.getLocation(), fileSystem);
+                if (!newRepo.ping()) {
+                    LOG.warn("Failed to connect repository {}. msg: {}", 
repo.getName(), repo.getErrorMsg());
+                    ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
+                            "Repo can not ping with new s3 properties");
+                }
+
+                Status st = repoMgr.alterRepo(newRepo, false /* not replay */);
+                if (!st.ok()) {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
+                            "Failed to alter repository: " + st.getErrMsg());
+                }
+                for (AbstractJob job : getAllCurrentJobs()) {
+                    if (!job.isDone() && job.getRepoId() == repo.getId()) {
+                        job.updateRepo(newRepo);
+                    }
+                }
+            } else {
+                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
+                                                "Only support alter s3 
repository");
+            }
+        } finally {
+            seqlock.unlock();
+        }
+    }
+
     // handle drop repository stmt
     public void dropRepository(DropRepositoryStmt stmt) throws DdlException {
         tryLock();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index d764b926e79..bed797b9167 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -273,6 +273,28 @@ public class BackupJob extends AbstractJob {
         return state == BackupJobState.CANCELLED;
     }
 
+    @Override
+    public synchronized Status updateRepo(Repository repo) {
+        this.repo = repo;
+
+        if (this.state == BackupJobState.UPLOADING) {
+            for (Map.Entry<Long, Long> entry : unfinishedTaskIds.entrySet()) {
+                long signature = entry.getKey();
+                long beId = entry.getValue();
+                AgentTask task = AgentTaskQueue.getTask(beId, 
TTaskType.UPLOAD, signature);
+                if (task == null || task.getTaskType() != TTaskType.UPLOAD) {
+                    continue;
+                }
+                ((UploadTask) task).updateBrokerProperties(
+                                
S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()));
+                AgentTaskQueue.updateTask(beId, TTaskType.UPLOAD, signature, 
task);
+            }
+            LOG.info("finished to update upload job properties. {}", this);
+        }
+        LOG.info("finished to update repo of job. {}", this);
+        return Status.OK;
+    }
+
     // Polling the job state and do the right things.
     @Override
     public synchronized void run() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
index 58dff0d7324..3c5fdd3578e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
@@ -29,6 +29,7 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.property.constants.S3Properties;
 import org.apache.doris.fs.PersistentFileSystem;
 import org.apache.doris.fs.remote.BrokerFileSystem;
 import org.apache.doris.fs.remote.RemoteFile;
@@ -58,7 +59,10 @@ import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 
 /*
@@ -295,6 +299,37 @@ public class Repository implements Writable {
         }
     }
 
+    public Status alterRepositoryS3Properties(Map<String, String> properties) {
+        if (fileSystem instanceof S3FileSystem) {
+            Map<String, String> oldProperties = new 
HashMap<>(this.getRemoteFileSystem().getProperties());
+            oldProperties.remove(S3Properties.ACCESS_KEY);
+            oldProperties.remove(S3Properties.SECRET_KEY);
+            oldProperties.remove(S3Properties.SESSION_TOKEN);
+            oldProperties.remove(S3Properties.Env.ACCESS_KEY);
+            oldProperties.remove(S3Properties.Env.SECRET_KEY);
+            oldProperties.remove(S3Properties.Env.TOKEN);
+            for (Map.Entry<String, String> entry : properties.entrySet()) {
+                if (Objects.equals(entry.getKey(), S3Properties.ACCESS_KEY)
+                        || Objects.equals(entry.getKey(), 
S3Properties.Env.ACCESS_KEY)) {
+                    oldProperties.putIfAbsent(S3Properties.ACCESS_KEY, 
entry.getValue());
+                }
+                if (Objects.equals(entry.getKey(), S3Properties.SECRET_KEY)
+                        || Objects.equals(entry.getKey(), 
S3Properties.Env.SECRET_KEY)) {
+                    oldProperties.putIfAbsent(S3Properties.SECRET_KEY, 
entry.getValue());
+                }
+                if (Objects.equals(entry.getKey(), S3Properties.SESSION_TOKEN)
+                        || Objects.equals(entry.getKey(), 
S3Properties.Env.TOKEN)) {
+                    oldProperties.putIfAbsent(S3Properties.SESSION_TOKEN, 
entry.getValue());
+                }
+            }
+            properties.clear();
+            properties.putAll(oldProperties);
+            return Status.OK;
+        } else {
+            return new Status(ErrCode.COMMON_ERROR, "Only support alter s3 
repository");
+        }
+    }
+
     // eg: location/__palo_repository_repo_name/__repo_info
     public String assembleRepoInfoFilePath() {
         return Joiner.on(PATH_DELIMITER).join(location,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java
index 3866c55331c..e923ba4e044 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java
@@ -21,6 +21,7 @@ import org.apache.doris.backup.Status.ErrCode;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.Daemon;
+import org.apache.doris.fs.remote.S3FileSystem;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -96,6 +97,31 @@ public class RepositoryMgr extends Daemon implements 
Writable {
         return repoIdMap.get(repoId);
     }
 
+    public Status alterRepo(Repository newRepo, boolean isReplay) {
+        lock.lock();
+        try {
+            Repository repo = repoNameMap.get(newRepo.getName());
+            if (repo != null) {
+                if (repo.getRemoteFileSystem() instanceof S3FileSystem) {
+                    repoNameMap.put(repo.getName(), newRepo);
+                    repoIdMap.put(repo.getId(), newRepo);
+
+                    if (!isReplay) {
+                        // log
+                        
Env.getCurrentEnv().getEditLog().logAlterRepository(newRepo);
+                    }
+                    LOG.info("successfully alter repo {}, isReplay {}", 
newRepo.getName(), isReplay);
+                    return Status.OK;
+                } else {
+                    return new Status(ErrCode.COMMON_ERROR, "Only support 
alter s3 repository");
+                }
+            }
+            return new Status(ErrCode.NOT_FOUND, "repository does not exist");
+        } finally {
+            lock.unlock();
+        }
+    }
+
     public Status removeRepo(String repoName, boolean isReplay) {
         lock.lock();
         try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index d0e7788a380..6d4f68e985d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -338,6 +338,28 @@ public class RestoreJob extends AbstractJob {
         return state == RestoreJobState.CANCELLED;
     }
 
+    @Override
+    public synchronized Status updateRepo(Repository repo) {
+        this.repo = repo;
+
+        if (this.state == RestoreJobState.DOWNLOADING) {
+            for (Map.Entry<Long, Long> entry : 
unfinishedSignatureToId.entrySet()) {
+                long signature = entry.getKey();
+                long beId = entry.getValue();
+                AgentTask task = AgentTaskQueue.getTask(beId, 
TTaskType.DOWNLOAD, signature);
+                if (task == null || task.getTaskType() != TTaskType.DOWNLOAD) {
+                    continue;
+                }
+                ((DownloadTask) task).updateBrokerProperties(
+                        
S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()));
+                AgentTaskQueue.updateTask(beId, TTaskType.DOWNLOAD, signature, 
task);
+            }
+            LOG.info("finished to update download job properties. {}", this);
+        }
+        LOG.info("finished to update repo of job. {}", this);
+        return Status.OK;
+    }
+
     @Override
     public void run() {
         if (state == RestoreJobState.FINISHED || state == 
RestoreJobState.CANCELLED) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 9b1d32d0642..aae8f4e4e7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -888,6 +888,11 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_ALTER_REPOSITORY: {
+                data = Repository.read(in);
+                isRead = true;
+                break;
+            }
             default: {
                 IOException e = new IOException();
                 LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index ba0ebc35c56..ea8292712af 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -1133,6 +1133,11 @@ public class EditLog {
                     env.getAlterInstance().processAlterMTMV(alterMtmv, true);
                     break;
                 }
+                case OperationType.OP_ALTER_REPOSITORY: {
+                    Repository repository = (Repository) journal.getData();
+                    env.getBackupHandler().getRepoMgr().alterRepo(repository, 
true);
+                    break;
+                }
                 default: {
                     IOException e = new IOException();
                     LOG.error("UNKNOWN Operation Type {}", opCode, e);
@@ -1544,6 +1549,10 @@ public class EditLog {
         logEdit(OperationType.OP_DROP_REPOSITORY, new Text(repoName));
     }
 
+    public void logAlterRepository(Repository repo) {
+        logEdit(OperationType.OP_ALTER_REPOSITORY, repo);
+    }
+
     public void logRestoreJob(RestoreJob job) {
         logEdit(OperationType.OP_RESTORE_JOB, job);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index ada6adf53d2..87b6b5bacfa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -351,6 +351,8 @@ public class OperationType {
 
     public static final short OP_ALTER_MTMV = 459;
 
+    public static final short OP_ALTER_REPOSITORY = 460;
+
     /**
      * Get opcode name by op code.
      **/
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 7a47dbcb541..47682cd74f0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -39,6 +39,7 @@ import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
 import org.apache.doris.analysis.AlterDatabaseRename;
 import org.apache.doris.analysis.AlterJobStatusStmt;
 import org.apache.doris.analysis.AlterPolicyStmt;
+import org.apache.doris.analysis.AlterRepositoryStmt;
 import org.apache.doris.analysis.AlterResourceStmt;
 import org.apache.doris.analysis.AlterRoutineLoadStmt;
 import org.apache.doris.analysis.AlterSqlBlockRuleStmt;
@@ -388,6 +389,8 @@ public class DdlExecutor {
         } else if (ddlStmt instanceof DropAnalyzeJobStmt) {
             DropAnalyzeJobStmt analyzeJobStmt = (DropAnalyzeJobStmt) ddlStmt;
             
Env.getCurrentEnv().getAnalysisManager().dropAnalyzeJob(analyzeJobStmt);
+        } else if (ddlStmt instanceof AlterRepositoryStmt) {
+            env.getBackupHandler().alterRepository((AlterRepositoryStmt) 
ddlStmt);
         } else {
             LOG.warn("Unkown statement " + ddlStmt.getClass());
             throw new DdlException("Unknown statement.");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
index 1677701272c..c22f0b2ee36 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
@@ -135,6 +135,18 @@ public class AgentTaskQueue {
         return signatureMap.get(signature);
     }
 
+    public static synchronized void updateTask(long backendId, TTaskType type, 
long signature, AgentTask newTask) {
+        if (!tasks.contains(backendId, type)) {
+            return;
+        }
+
+        Map<Long, AgentTask> signatureMap = tasks.get(backendId, type);
+        if (!signatureMap.containsKey(signature)) {
+            return;
+        }
+        signatureMap.put(signature, newTask);
+    }
+
     // this is just for unit test
     public static synchronized List<AgentTask> getTask(TTaskType type) {
         List<AgentTask> res = Lists.newArrayList();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java
index 6482c5f807b..5dbbc46b7b2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java
@@ -78,6 +78,10 @@ public class DownloadTask extends AgentTask {
         return brokerProperties;
     }
 
+    public void updateBrokerProperties(Map<String, String> brokerProperties) {
+        this.brokerProperties = new java.util.HashMap<>(brokerProperties);
+    }
+
     public TDownloadReq toThrift() {
         // these fields are required
         // 1: required i64 job_id
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/UploadTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/UploadTask.java
index f41d722825a..b2a4ae489ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/UploadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/UploadTask.java
@@ -24,6 +24,7 @@ import org.apache.doris.thrift.TResourceInfo;
 import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.thrift.TUploadReq;
 
+import java.util.HashMap;
 import java.util.Map;
 
 public class UploadTask extends AgentTask {
@@ -64,6 +65,10 @@ public class UploadTask extends AgentTask {
         return brokerProperties;
     }
 
+    public void updateBrokerProperties(Map<String, String> brokerProperties) {
+        this.brokerProperties = new HashMap<>(brokerProperties);
+    }
+
     public TUploadReq toThrift() {
         TNetworkAddress address = new TNetworkAddress(broker.host, 
broker.port);
         TUploadReq request = new TUploadReq(jobId, srcToDestPath, address);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to