This is an automated email from the ASF dual-hosted git repository.

wuzhiguo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bigtop-manager.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ba4e934 BIGTOP-4324: Update agent cache files before job runs (#146)
3ba4e934 is described below

commit 3ba4e93401f70fd837866add2986770f8ab7a857
Author: Zhiguo Wu <[email protected]>
AuthorDate: Sat Jan 11 14:15:13 2025 +0800

    BIGTOP-4324: Update agent cache files before job runs (#146)
---
 .../JobCacheServiceGrpcImpl.java}                  |  53 +++---
 .../src/main/resources/proto/job_cache.proto       |  36 ++++
 .../JobCacheHelper.java}                           | 183 ++++++++-------------
 .../manager/server/command/job/AbstractJob.java    |  26 +--
 .../server/command/job/cluster/ClusterAddJob.java  |  14 +-
 .../command/job/component/ComponentAddJob.java     |   3 -
 .../server/command/job/host/HostAddJob.java        |   2 -
 .../server/command/job/service/ServiceAddJob.java  |   3 -
 .../command/job/service/ServiceConfigureJob.java   |   3 -
 .../server/command/stage/CacheFileUpdateStage.java |  82 ---------
 10 files changed, 155 insertions(+), 250 deletions(-)

diff --git 
a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/CacheFileUpdateCommandExecutor.java
 
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/JobCacheServiceGrpcImpl.java
similarity index 52%
rename from 
bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/CacheFileUpdateCommandExecutor.java
rename to 
bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/JobCacheServiceGrpcImpl.java
index 74b014cf..b5b9891c 100644
--- 
a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/CacheFileUpdateCommandExecutor.java
+++ 
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/JobCacheServiceGrpcImpl.java
@@ -16,24 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.bigtop.manager.agent.executor;
+package org.apache.bigtop.manager.agent.service;
 
 import org.apache.bigtop.manager.common.constants.MessageConstants;
 import 
org.apache.bigtop.manager.common.message.entity.payload.CacheMessagePayload;
 import org.apache.bigtop.manager.common.utils.JsonUtils;
 import org.apache.bigtop.manager.common.utils.ProjectPathUtils;
-import org.apache.bigtop.manager.grpc.generated.CommandType;
-
-import org.springframework.beans.factory.config.ConfigurableBeanFactory;
-import org.springframework.context.annotation.Scope;
-import org.springframework.stereotype.Component;
+import org.apache.bigtop.manager.grpc.generated.JobCacheReply;
+import org.apache.bigtop.manager.grpc.generated.JobCacheRequest;
+import org.apache.bigtop.manager.grpc.generated.JobCacheServiceGrpc;
 
+import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
+import net.devh.boot.grpc.server.service.GrpcService;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.text.MessageFormat;
 
 import static 
org.apache.bigtop.manager.common.constants.CacheFiles.CLUSTER_INFO;
 import static 
org.apache.bigtop.manager.common.constants.CacheFiles.COMPONENTS_INFO;
@@ -43,41 +42,33 @@ import static 
org.apache.bigtop.manager.common.constants.CacheFiles.REPOS_INFO;
 import static org.apache.bigtop.manager.common.constants.CacheFiles.USERS_INFO;
 
 @Slf4j
-@Component
-@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
-public class CacheFileUpdateCommandExecutor extends AbstractCommandExecutor {
-
-    @Override
-    public CommandType getCommandType() {
-        return CommandType.UPDATE_CACHE_FILES;
-    }
+@GrpcService
+public class JobCacheServiceGrpcImpl extends 
JobCacheServiceGrpc.JobCacheServiceImplBase {
 
     @Override
-    public void doExecute() {
-        CacheMessagePayload cacheMessagePayload =
-                JsonUtils.readFromString(commandRequest.getPayload(), 
CacheMessagePayload.class);
+    public void save(JobCacheRequest request, StreamObserver<JobCacheReply> 
responseObserver) {
+        CacheMessagePayload payload = 
JsonUtils.readFromString(request.getPayload(), CacheMessagePayload.class);
         String cacheDir = ProjectPathUtils.getAgentCachePath();
         Path p = Paths.get(cacheDir);
         if (!Files.exists(p)) {
             try {
                 Files.createDirectories(p);
             } catch (Exception e) {
-                log.error("Create directory failed: {}", cacheDir, e);
-                commandReplyBuilder.setCode(MessageConstants.FAIL_CODE);
-                commandReplyBuilder.setResult(
-                        MessageFormat.format("Create directory {0}, failed: 
{1}", cacheDir, e.getMessage()));
-                return;
+                responseObserver.onError(e);
             }
         }
 
-        JsonUtils.writeToFile(cacheDir + CONFIGURATIONS_INFO, 
cacheMessagePayload.getConfigurations());
-        JsonUtils.writeToFile(cacheDir + HOSTS_INFO, 
cacheMessagePayload.getClusterHostInfo());
-        JsonUtils.writeToFile(cacheDir + USERS_INFO, 
cacheMessagePayload.getUserInfo());
-        JsonUtils.writeToFile(cacheDir + COMPONENTS_INFO, 
cacheMessagePayload.getComponentInfo());
-        JsonUtils.writeToFile(cacheDir + REPOS_INFO, 
cacheMessagePayload.getRepoInfo());
-        JsonUtils.writeToFile(cacheDir + CLUSTER_INFO, 
cacheMessagePayload.getClusterInfo());
+        JsonUtils.writeToFile(cacheDir + CONFIGURATIONS_INFO, 
payload.getConfigurations());
+        JsonUtils.writeToFile(cacheDir + HOSTS_INFO, 
payload.getClusterHostInfo());
+        JsonUtils.writeToFile(cacheDir + USERS_INFO, payload.getUserInfo());
+        JsonUtils.writeToFile(cacheDir + COMPONENTS_INFO, 
payload.getComponentInfo());
+        JsonUtils.writeToFile(cacheDir + REPOS_INFO, payload.getRepoInfo());
+        JsonUtils.writeToFile(cacheDir + CLUSTER_INFO, 
payload.getClusterInfo());
 
-        commandReplyBuilder.setCode(MessageConstants.SUCCESS_CODE);
-        commandReplyBuilder.setResult("Successfully cached files");
+        JobCacheReply reply = JobCacheReply.newBuilder()
+                .setCode(MessageConstants.SUCCESS_CODE)
+                .build();
+        responseObserver.onNext(reply);
+        responseObserver.onCompleted();
     }
 }
diff --git a/bigtop-manager-grpc/src/main/resources/proto/job_cache.proto 
b/bigtop-manager-grpc/src/main/resources/proto/job_cache.proto
new file mode 100644
index 00000000..660bf153
--- /dev/null
+++ b/bigtop-manager-grpc/src/main/resources/proto/job_cache.proto
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "org.apache.bigtop.manager.grpc.generated";
+option java_outer_classname = "JobCacheProto";
+
+service JobCacheService {
+  rpc save (JobCacheRequest) returns (JobCacheReply) {}
+}
+
+message JobCacheRequest {
+  int64 job_id = 1;
+  string payload = 2;
+}
+
+message JobCacheReply {
+  int32 code = 1;
+}
diff --git 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/CacheFileUpdateTask.java
 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/helper/JobCacheHelper.java
similarity index 53%
rename from 
bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/CacheFileUpdateTask.java
rename to 
bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/helper/JobCacheHelper.java
index 42468e4e..9bea2e67 100644
--- 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/CacheFileUpdateTask.java
+++ 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/helper/JobCacheHelper.java
@@ -16,12 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.bigtop.manager.server.command.task;
+package org.apache.bigtop.manager.server.command.helper;
 
-import org.apache.bigtop.manager.common.enums.Command;
+import org.apache.bigtop.manager.common.constants.MessageConstants;
 import 
org.apache.bigtop.manager.common.message.entity.payload.CacheMessagePayload;
 import org.apache.bigtop.manager.common.message.entity.pojo.ClusterInfo;
-import org.apache.bigtop.manager.common.message.entity.pojo.ComponentInfo;
 import org.apache.bigtop.manager.common.message.entity.pojo.RepoInfo;
 import org.apache.bigtop.manager.common.utils.JsonUtils;
 import org.apache.bigtop.manager.dao.po.ClusterPO;
@@ -35,8 +34,11 @@ import org.apache.bigtop.manager.dao.repository.ComponentDao;
 import org.apache.bigtop.manager.dao.repository.HostDao;
 import org.apache.bigtop.manager.dao.repository.RepoDao;
 import org.apache.bigtop.manager.dao.repository.ServiceConfigDao;
-import org.apache.bigtop.manager.grpc.generated.CommandRequest;
-import org.apache.bigtop.manager.grpc.generated.CommandType;
+import org.apache.bigtop.manager.grpc.generated.JobCacheReply;
+import org.apache.bigtop.manager.grpc.generated.JobCacheRequest;
+import org.apache.bigtop.manager.grpc.generated.JobCacheServiceGrpc;
+import org.apache.bigtop.manager.server.exception.ServerException;
+import org.apache.bigtop.manager.server.grpc.GrpcClient;
 import org.apache.bigtop.manager.server.holder.SpringContextHolder;
 import org.apache.bigtop.manager.server.model.converter.RepoConverter;
 import org.apache.bigtop.manager.server.model.dto.ServiceDTO;
@@ -49,59 +51,72 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static 
org.apache.bigtop.manager.common.constants.Constants.ALL_HOST_KEY;
 
-public class CacheFileUpdateTask extends AbstractTask {
+public class JobCacheHelper {
 
-    private ClusterDao clusterDao;
-    private ServiceConfigDao serviceConfigDao;
-    private RepoDao repoDao;
-    private HostDao hostDao;
-    private ComponentDao componentDao;
+    private static ClusterDao clusterDao;
+    private static ServiceConfigDao serviceConfigDao;
+    private static RepoDao repoDao;
+    private static HostDao hostDao;
+    private static ComponentDao componentDao;
 
-    private ClusterInfo clusterInfo;
-    private Map<String, ComponentInfo> componentInfoMap;
-    private Map<String, Map<String, Object>> serviceConfigMap;
-    private Map<String, Set<String>> hostMap;
-    private List<RepoInfo> repoList;
-    private Map<String, String> userMap;
+    private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false);
 
-    public CacheFileUpdateTask(TaskContext taskContext) {
-        super(taskContext);
-    }
-
-    @Override
-    protected void injectBeans() {
-        super.injectBeans();
+    private static void initialize() {
+        clusterDao = SpringContextHolder.getBean(ClusterDao.class);
+        serviceConfigDao = SpringContextHolder.getBean(ServiceConfigDao.class);
+        repoDao = SpringContextHolder.getBean(RepoDao.class);
+        hostDao = SpringContextHolder.getBean(HostDao.class);
+        componentDao = SpringContextHolder.getBean(ComponentDao.class);
 
-        this.clusterDao = SpringContextHolder.getBean(ClusterDao.class);
-        this.serviceConfigDao = 
SpringContextHolder.getBean(ServiceConfigDao.class);
-        this.repoDao = SpringContextHolder.getBean(RepoDao.class);
-        this.hostDao = SpringContextHolder.getBean(HostDao.class);
-        this.componentDao = SpringContextHolder.getBean(ComponentDao.class);
+        INITIALIZED.set(true);
     }
 
-    @Override
-    public void beforeRun() {
-        super.beforeRun();
+    public static void sendJobCache(Long clusterId, Long jobId, List<String> 
hostnames) {
+        CacheMessagePayload payload = genPayload(clusterId);
+        JobCacheRequest request = JobCacheRequest.newBuilder()
+                .setJobId(jobId)
+                .setPayload(JsonUtils.writeAsString(payload))
+                .build();
+        List<HostPO> hostPOList = hostDao.findAllByHostnames(hostnames);
+        List<CompletableFuture<Boolean>> futures = new ArrayList<>();
+        for (HostPO hostPO : hostPOList) {
+            futures.add(CompletableFuture.supplyAsync(() -> {
+                JobCacheServiceGrpc.JobCacheServiceBlockingStub stub = 
GrpcClient.getBlockingStub(
+                        hostPO.getHostname(),
+                        hostPO.getGrpcPort(),
+                        JobCacheServiceGrpc.JobCacheServiceBlockingStub.class);
+                JobCacheReply reply = stub.save(request);
+                return reply != null && reply.getCode() == 
MessageConstants.SUCCESS_CODE;
+            }));
+        }
 
-        genCaches();
+        List<Boolean> results = futures.stream()
+                .map((future) -> {
+                    try {
+                        return future.get();
+                    } catch (Exception e) {
+                        return false;
+                    }
+                })
+                .toList();
+
+        boolean allSuccess = results.stream().allMatch(Boolean::booleanValue);
+        if (!allSuccess) {
+            throw new ServerException("Failed to send job cache");
+        }
     }
 
-    private void genCaches() {
-        if (taskContext.getClusterId() == null) {
-            genEmptyCaches();
-        } else {
-            genFullCaches();
+    private static CacheMessagePayload genPayload(Long clusterId) {
+        if (!INITIALIZED.get()) {
+            initialize();
         }
-    }
 
-    @SuppressWarnings("unchecked")
-    private void genFullCaches() {
-        Long clusterId = taskContext.getClusterId();
-        List<String> hostnames = (List<String>) 
taskContext.getProperties().get("hostnames");
         ClusterPO clusterPO = clusterDao.findById(clusterId);
 
         ComponentQuery componentQuery =
@@ -110,14 +125,14 @@ public class CacheFileUpdateTask extends AbstractTask {
         List<ServiceConfigPO> serviceConfigPOList = 
serviceConfigDao.findByClusterId(clusterPO.getId());
         List<ComponentPO> componentPOList = 
componentDao.findByQuery(componentQuery);
         List<RepoPO> repoPOList = repoDao.findAll();
-        List<HostPO> hostPOList = hostDao.findAllByHostnames(hostnames);
+        List<HostPO> hostPOList = hostDao.findAll();
 
-        clusterInfo = new ClusterInfo();
+        ClusterInfo clusterInfo = new ClusterInfo();
         clusterInfo.setName(clusterPO.getName());
         clusterInfo.setUserGroup(clusterPO.getUserGroup());
         clusterInfo.setRootDir(clusterPO.getRootDir());
 
-        serviceConfigMap = new HashMap<>();
+        Map<String, Map<String, Object>> serviceConfigMap = new HashMap<>();
         for (ServiceConfigPO serviceConfigPO : serviceConfigPOList) {
             List<Map<String, Object>> properties = 
JsonUtils.readFromString(serviceConfigPO.getPropertiesJson());
             Map<String, String> kvMap = properties.stream()
@@ -134,7 +149,7 @@ public class CacheFileUpdateTask extends AbstractTask {
             }
         }
 
-        hostMap = new HashMap<>();
+        Map<String, Set<String>> hostMap = new HashMap<>();
         componentPOList.forEach(x -> {
             if (hostMap.containsKey(x.getName())) {
                 hostMap.get(x.getName()).add(x.getHostname());
@@ -149,81 +164,25 @@ public class CacheFileUpdateTask extends AbstractTask {
         Set<String> hostNameSet = 
hostPOList.stream().map(HostPO::getHostname).collect(Collectors.toSet());
         hostMap.put(ALL_HOST_KEY, hostNameSet);
 
-        repoList = new ArrayList<>();
+        List<RepoInfo> repoList = new ArrayList<>();
         repoPOList.forEach(repoPO -> {
             RepoInfo repoInfo = RepoConverter.INSTANCE.fromPO2Message(repoPO);
             repoList.add(repoInfo);
         });
 
-        userMap = new HashMap<>();
+        Map<String, String> userMap = new HashMap<>();
         for (StackDTO stackDTO : StackUtils.getAllStacks()) {
             for (ServiceDTO serviceDTO : 
StackUtils.getServiceDTOList(stackDTO)) {
                 userMap.put(serviceDTO.getName(), serviceDTO.getUser());
             }
         }
-    }
-
-    @SuppressWarnings("unchecked")
-    private void genEmptyCaches() {
-        List<String> hostnames = (List<String>) 
taskContext.getProperties().get("hostnames");
-
-        List<RepoPO> repoPOList = repoDao.findAll();
-        List<HostPO> hostPOList = hostDao.findAllByHostnames(hostnames);
-
-        componentInfoMap = new HashMap<>();
-        serviceConfigMap = new HashMap<>();
-
-        clusterInfo = new ClusterInfo();
-        clusterInfo.setUserGroup(taskContext.getUserGroup());
-        clusterInfo.setRootDir(taskContext.getRootDir());
-
-        hostMap = new HashMap<>();
-        Set<String> hostNameSet = 
hostPOList.stream().map(HostPO::getHostname).collect(Collectors.toSet());
-        hostMap.put(ALL_HOST_KEY, hostNameSet);
-
-        repoList = new ArrayList<>();
-        repoPOList.forEach(repoPO -> {
-            RepoInfo repoInfo = RepoConverter.INSTANCE.fromPO2Message(repoPO);
-            repoList.add(repoInfo);
-        });
-
-        userMap = new HashMap<>();
-        for (StackDTO stackDTO : StackUtils.getAllStacks()) {
-            for (ServiceDTO serviceDTO : 
StackUtils.getServiceDTOList(stackDTO)) {
-                userMap.put(serviceDTO.getName(), serviceDTO.getUser());
-            }
-        }
-    }
-
-    @Override
-    protected Command getCommand() {
-        return Command.CUSTOM;
-    }
-
-    @Override
-    protected String getCustomCommand() {
-        return "update_cache_files";
-    }
-
-    @Override
-    protected CommandRequest getCommandRequest() {
-        CacheMessagePayload messagePayload = new CacheMessagePayload();
-        messagePayload.setClusterInfo(clusterInfo);
-        messagePayload.setConfigurations(serviceConfigMap);
-        messagePayload.setClusterHostInfo(hostMap);
-        messagePayload.setRepoInfo(repoList);
-        messagePayload.setUserInfo(userMap);
-        messagePayload.setComponentInfo(componentInfoMap);
-
-        CommandRequest.Builder builder = CommandRequest.newBuilder();
-        builder.setType(CommandType.UPDATE_CACHE_FILES);
-        builder.setPayload(JsonUtils.writeAsString(messagePayload));
-
-        return builder.build();
-    }
 
-    @Override
-    public String getName() {
-        return "Update cache files on " + taskContext.getHostname();
+        CacheMessagePayload payload = new CacheMessagePayload();
+        payload.setClusterInfo(clusterInfo);
+        payload.setConfigurations(serviceConfigMap);
+        payload.setClusterHostInfo(hostMap);
+        payload.setRepoInfo(repoList);
+        payload.setUserInfo(userMap);
+        return payload;
     }
 }
diff --git 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java
 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java
index 1f72ba5d..02f07ecf 100644
--- 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java
+++ 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java
@@ -28,7 +28,7 @@ import org.apache.bigtop.manager.dao.repository.ClusterDao;
 import org.apache.bigtop.manager.dao.repository.JobDao;
 import org.apache.bigtop.manager.dao.repository.StageDao;
 import org.apache.bigtop.manager.dao.repository.TaskDao;
-import org.apache.bigtop.manager.server.command.stage.CacheFileUpdateStage;
+import org.apache.bigtop.manager.server.command.helper.JobCacheHelper;
 import org.apache.bigtop.manager.server.command.stage.Stage;
 import org.apache.bigtop.manager.server.command.stage.StageContext;
 import org.apache.bigtop.manager.server.command.task.Task;
@@ -84,11 +84,6 @@ public abstract class AbstractJob implements Job {
 
     protected abstract void createStages();
 
-    protected void createCacheStage() {
-        StageContext stageContext = 
StageContext.fromCommandDTO(jobContext.getCommandDTO());
-        stages.add(new CacheFileUpdateStage(stageContext));
-    }
-
     @Override
     public void beforeRun() {
         jobPO.setState(JobState.PROCESSING.getName());
@@ -100,8 +95,18 @@ public abstract class AbstractJob implements Job {
         boolean success = true;
 
         try {
+            // Persist job state and required data.
             beforeRun();
 
+            // Send job cache to agents
+            List<String> hostnames = stages.stream()
+                    .map(Stage::getStageContext)
+                    .map(StageContext::getHostnames)
+                    .flatMap(List::stream)
+                    .distinct()
+                    .toList();
+            JobCacheHelper.sendJobCache(clusterPO.getId(), jobPO.getId(), 
hostnames);
+
             LinkedBlockingQueue<Stage> queue = new 
LinkedBlockingQueue<>(stages);
             while (!queue.isEmpty()) {
                 Stage stage = queue.poll();
@@ -152,12 +157,9 @@ public abstract class AbstractJob implements Job {
                 }
             }
         }
-        if (!taskPOList.isEmpty()) {
-            taskDao.partialUpdateByIds(taskPOList);
-        }
-        if (!stagePOList.isEmpty()) {
-            stageDao.partialUpdateByIds(stagePOList);
-        }
+
+        taskDao.partialUpdateByIds(taskPOList);
+        stageDao.partialUpdateByIds(stagePOList);
         jobDao.partialUpdateById(jobPO);
     }
 
diff --git 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/cluster/ClusterAddJob.java
 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/cluster/ClusterAddJob.java
index f8650456..0661b322 100644
--- 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/cluster/ClusterAddJob.java
+++ 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/cluster/ClusterAddJob.java
@@ -23,7 +23,6 @@ import org.apache.bigtop.manager.dao.po.StagePO;
 import org.apache.bigtop.manager.dao.po.TaskPO;
 import org.apache.bigtop.manager.server.command.job.AbstractJob;
 import org.apache.bigtop.manager.server.command.job.JobContext;
-import org.apache.bigtop.manager.server.command.stage.CacheFileUpdateStage;
 import org.apache.bigtop.manager.server.command.stage.HostCheckStage;
 import org.apache.bigtop.manager.server.command.stage.SetupJdkStage;
 import org.apache.bigtop.manager.server.command.stage.Stage;
@@ -54,11 +53,22 @@ public class ClusterAddJob extends AbstractJob {
         hostService = SpringContextHolder.getBean(HostService.class);
     }
 
+    @Override
+    protected void beforeCreateStages() {
+        super.beforeCreateStages();
+
+        if (jobContext.getRetryFlag()) {
+            // Cluster already created, but command still doesn't have cluster 
id
+            // So we need to find the cluster by name
+            String clusterName = 
jobContext.getCommandDTO().getClusterCommand().getName();
+            clusterPO = clusterDao.findByName(clusterName);
+        }
+    }
+
     @Override
     protected void createStages() {
         StageContext stageContext = 
StageContext.fromCommandDTO(jobContext.getCommandDTO());
         stages.add(new HostCheckStage(stageContext));
-        stages.add(new CacheFileUpdateStage(stageContext));
         stages.add(new SetupJdkStage(stageContext));
     }
 
diff --git 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentAddJob.java
 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentAddJob.java
index c95fd60b..99a3fd01 100644
--- 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentAddJob.java
+++ 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentAddJob.java
@@ -49,9 +49,6 @@ public class ComponentAddJob extends AbstractComponentJob {
 
     @Override
     protected void createStages() {
-        // Update cache files
-        super.createCacheStage();
-
         CommandDTO commandDTO = jobContext.getCommandDTO();
         Map<String, List<String>> componentHostsMap = getComponentHostsMap();
 
diff --git 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/host/HostAddJob.java
 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/host/HostAddJob.java
index a10f240a..ba045d31 100644
--- 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/host/HostAddJob.java
+++ 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/host/HostAddJob.java
@@ -19,7 +19,6 @@
 package org.apache.bigtop.manager.server.command.job.host;
 
 import org.apache.bigtop.manager.server.command.job.JobContext;
-import org.apache.bigtop.manager.server.command.stage.CacheFileUpdateStage;
 import org.apache.bigtop.manager.server.command.stage.HostCheckStage;
 import org.apache.bigtop.manager.server.command.stage.SetupJdkStage;
 import org.apache.bigtop.manager.server.command.stage.StageContext;
@@ -50,7 +49,6 @@ public class HostAddJob extends AbstractHostJob {
     protected void createStages() {
         StageContext stageContext = 
StageContext.fromCommandDTO(jobContext.getCommandDTO());
         stages.add(new HostCheckStage(stageContext));
-        stages.add(new CacheFileUpdateStage(stageContext));
         stages.add(new SetupJdkStage(stageContext));
     }
 
diff --git 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceAddJob.java
 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceAddJob.java
index 690f3542..fb4ba777 100644
--- 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceAddJob.java
+++ 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceAddJob.java
@@ -58,9 +58,6 @@ public class ServiceAddJob extends AbstractServiceJob {
 
     @Override
     protected void createStages() {
-        // Update cache files
-        super.createCacheStage();
-
         CommandDTO commandDTO = jobContext.getCommandDTO();
         Map<String, List<String>> componentHostsMap = getComponentHostsMap();
 
diff --git 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceConfigureJob.java
 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceConfigureJob.java
index f6325685..93c61bba 100644
--- 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceConfigureJob.java
+++ 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceConfigureJob.java
@@ -34,9 +34,6 @@ public class ServiceConfigureJob extends AbstractServiceJob {
 
     @Override
     protected void createStages() {
-        // Update cache files
-        super.createCacheStage();
-
         CommandDTO commandDTO = jobContext.getCommandDTO();
         Map<String, List<String>> componentHostsMap = getComponentHostsMap();
 
diff --git 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/CacheFileUpdateStage.java
 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/CacheFileUpdateStage.java
deleted file mode 100644
index 88c57081..00000000
--- 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/CacheFileUpdateStage.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.bigtop.manager.server.command.stage;
-
-import org.apache.bigtop.manager.dao.po.HostPO;
-import org.apache.bigtop.manager.server.command.task.CacheFileUpdateTask;
-import org.apache.bigtop.manager.server.command.task.Task;
-import org.apache.bigtop.manager.server.command.task.TaskContext;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class CacheFileUpdateStage extends AbstractStage {
-
-    public CacheFileUpdateStage(StageContext stageContext) {
-        super(stageContext);
-    }
-
-    @Override
-    protected void injectBeans() {
-        super.injectBeans();
-    }
-
-    @Override
-    protected void beforeCreateTasks() {
-        List<String> hostnames = new ArrayList<>();
-
-        if (stageContext.getClusterId() == null) {
-            hostnames.addAll(stageContext.getHostnames() == null ? List.of() : 
stageContext.getHostnames());
-        } else {
-            hostnames.addAll(stageContext.getHostnames() == null ? List.of() : 
stageContext.getHostnames());
-            
hostnames.addAll(hostDao.findAllByClusterId(stageContext.getClusterId()).stream()
-                    .map(HostPO::getHostname)
-                    .toList());
-        }
-
-        stageContext.setHostnames(hostnames);
-    }
-
-    @Override
-    protected Task createTask(String hostname) {
-        TaskContext taskContext = new TaskContext();
-        taskContext.setHostname(hostname);
-        taskContext.setClusterId(stageContext.getClusterId());
-        taskContext.setClusterName(stageContext.getClusterName());
-        taskContext.setUserGroup(stageContext.getUserGroup());
-        taskContext.setRootDir(stageContext.getRootDir());
-        taskContext.setServiceName("cluster");
-        taskContext.setServiceUser("root");
-        taskContext.setComponentName("agent");
-        taskContext.setComponentDisplayName("Agent");
-
-        Map<String, Object> properties = new HashMap<>();
-        properties.put("hostnames", stageContext.getHostnames());
-        taskContext.setProperties(properties);
-
-        return new CacheFileUpdateTask(taskContext);
-    }
-
-    @Override
-    public String getName() {
-        return "Update cache files";
-    }
-}

Reply via email to