This is an automated email from the ASF dual-hosted git repository.
wuzhiguo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bigtop-manager.git
The following commit(s) were added to refs/heads/main by this push:
new 3ba4e934 BIGTOP-4324: Update agent cache files before job runs (#146)
3ba4e934 is described below
commit 3ba4e93401f70fd837866add2986770f8ab7a857
Author: Zhiguo Wu <[email protected]>
AuthorDate: Sat Jan 11 14:15:13 2025 +0800
BIGTOP-4324: Update agent cache files before job runs (#146)
---
.../JobCacheServiceGrpcImpl.java} | 53 +++---
.../src/main/resources/proto/job_cache.proto | 36 ++++
.../JobCacheHelper.java} | 183 ++++++++-------------
.../manager/server/command/job/AbstractJob.java | 26 +--
.../server/command/job/cluster/ClusterAddJob.java | 14 +-
.../command/job/component/ComponentAddJob.java | 3 -
.../server/command/job/host/HostAddJob.java | 2 -
.../server/command/job/service/ServiceAddJob.java | 3 -
.../command/job/service/ServiceConfigureJob.java | 3 -
.../server/command/stage/CacheFileUpdateStage.java | 82 ---------
10 files changed, 155 insertions(+), 250 deletions(-)
diff --git
a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/CacheFileUpdateCommandExecutor.java
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/JobCacheServiceGrpcImpl.java
similarity index 52%
rename from
bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/CacheFileUpdateCommandExecutor.java
rename to
bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/JobCacheServiceGrpcImpl.java
index 74b014cf..b5b9891c 100644
---
a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/CacheFileUpdateCommandExecutor.java
+++
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/JobCacheServiceGrpcImpl.java
@@ -16,24 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.bigtop.manager.agent.executor;
+package org.apache.bigtop.manager.agent.service;
import org.apache.bigtop.manager.common.constants.MessageConstants;
import
org.apache.bigtop.manager.common.message.entity.payload.CacheMessagePayload;
import org.apache.bigtop.manager.common.utils.JsonUtils;
import org.apache.bigtop.manager.common.utils.ProjectPathUtils;
-import org.apache.bigtop.manager.grpc.generated.CommandType;
-
-import org.springframework.beans.factory.config.ConfigurableBeanFactory;
-import org.springframework.context.annotation.Scope;
-import org.springframework.stereotype.Component;
+import org.apache.bigtop.manager.grpc.generated.JobCacheReply;
+import org.apache.bigtop.manager.grpc.generated.JobCacheRequest;
+import org.apache.bigtop.manager.grpc.generated.JobCacheServiceGrpc;
+import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
+import net.devh.boot.grpc.server.service.GrpcService;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.text.MessageFormat;
import static
org.apache.bigtop.manager.common.constants.CacheFiles.CLUSTER_INFO;
import static
org.apache.bigtop.manager.common.constants.CacheFiles.COMPONENTS_INFO;
@@ -43,41 +42,33 @@ import static
org.apache.bigtop.manager.common.constants.CacheFiles.REPOS_INFO;
import static org.apache.bigtop.manager.common.constants.CacheFiles.USERS_INFO;
@Slf4j
-@Component
-@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
-public class CacheFileUpdateCommandExecutor extends AbstractCommandExecutor {
-
- @Override
- public CommandType getCommandType() {
- return CommandType.UPDATE_CACHE_FILES;
- }
+@GrpcService
+public class JobCacheServiceGrpcImpl extends
JobCacheServiceGrpc.JobCacheServiceImplBase {
@Override
- public void doExecute() {
- CacheMessagePayload cacheMessagePayload =
- JsonUtils.readFromString(commandRequest.getPayload(),
CacheMessagePayload.class);
+ public void save(JobCacheRequest request, StreamObserver<JobCacheReply>
responseObserver) {
+ CacheMessagePayload payload =
JsonUtils.readFromString(request.getPayload(), CacheMessagePayload.class);
String cacheDir = ProjectPathUtils.getAgentCachePath();
Path p = Paths.get(cacheDir);
if (!Files.exists(p)) {
try {
Files.createDirectories(p);
} catch (Exception e) {
- log.error("Create directory failed: {}", cacheDir, e);
- commandReplyBuilder.setCode(MessageConstants.FAIL_CODE);
- commandReplyBuilder.setResult(
- MessageFormat.format("Create directory {0}, failed:
{1}", cacheDir, e.getMessage()));
- return;
+ responseObserver.onError(e);
}
}
- JsonUtils.writeToFile(cacheDir + CONFIGURATIONS_INFO,
cacheMessagePayload.getConfigurations());
- JsonUtils.writeToFile(cacheDir + HOSTS_INFO,
cacheMessagePayload.getClusterHostInfo());
- JsonUtils.writeToFile(cacheDir + USERS_INFO,
cacheMessagePayload.getUserInfo());
- JsonUtils.writeToFile(cacheDir + COMPONENTS_INFO,
cacheMessagePayload.getComponentInfo());
- JsonUtils.writeToFile(cacheDir + REPOS_INFO,
cacheMessagePayload.getRepoInfo());
- JsonUtils.writeToFile(cacheDir + CLUSTER_INFO,
cacheMessagePayload.getClusterInfo());
+ JsonUtils.writeToFile(cacheDir + CONFIGURATIONS_INFO,
payload.getConfigurations());
+ JsonUtils.writeToFile(cacheDir + HOSTS_INFO,
payload.getClusterHostInfo());
+ JsonUtils.writeToFile(cacheDir + USERS_INFO, payload.getUserInfo());
+ JsonUtils.writeToFile(cacheDir + COMPONENTS_INFO,
payload.getComponentInfo());
+ JsonUtils.writeToFile(cacheDir + REPOS_INFO, payload.getRepoInfo());
+ JsonUtils.writeToFile(cacheDir + CLUSTER_INFO,
payload.getClusterInfo());
- commandReplyBuilder.setCode(MessageConstants.SUCCESS_CODE);
- commandReplyBuilder.setResult("Successfully cached files");
+ JobCacheReply reply = JobCacheReply.newBuilder()
+ .setCode(MessageConstants.SUCCESS_CODE)
+ .build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
}
}
diff --git a/bigtop-manager-grpc/src/main/resources/proto/job_cache.proto
b/bigtop-manager-grpc/src/main/resources/proto/job_cache.proto
new file mode 100644
index 00000000..660bf153
--- /dev/null
+++ b/bigtop-manager-grpc/src/main/resources/proto/job_cache.proto
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "org.apache.bigtop.manager.grpc.generated";
+option java_outer_classname = "JobCacheProto";
+
+service JobCacheService {
+ rpc save (JobCacheRequest) returns (JobCacheReply) {}
+}
+
+message JobCacheRequest {
+ int64 job_id = 1;
+ string payload = 2;
+}
+
+message JobCacheReply {
+ int32 code = 1;
+}
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/CacheFileUpdateTask.java
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/helper/JobCacheHelper.java
similarity index 53%
rename from
bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/CacheFileUpdateTask.java
rename to
bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/helper/JobCacheHelper.java
index 42468e4e..9bea2e67 100644
---
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/CacheFileUpdateTask.java
+++
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/helper/JobCacheHelper.java
@@ -16,12 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.bigtop.manager.server.command.task;
+package org.apache.bigtop.manager.server.command.helper;
-import org.apache.bigtop.manager.common.enums.Command;
+import org.apache.bigtop.manager.common.constants.MessageConstants;
import
org.apache.bigtop.manager.common.message.entity.payload.CacheMessagePayload;
import org.apache.bigtop.manager.common.message.entity.pojo.ClusterInfo;
-import org.apache.bigtop.manager.common.message.entity.pojo.ComponentInfo;
import org.apache.bigtop.manager.common.message.entity.pojo.RepoInfo;
import org.apache.bigtop.manager.common.utils.JsonUtils;
import org.apache.bigtop.manager.dao.po.ClusterPO;
@@ -35,8 +34,11 @@ import org.apache.bigtop.manager.dao.repository.ComponentDao;
import org.apache.bigtop.manager.dao.repository.HostDao;
import org.apache.bigtop.manager.dao.repository.RepoDao;
import org.apache.bigtop.manager.dao.repository.ServiceConfigDao;
-import org.apache.bigtop.manager.grpc.generated.CommandRequest;
-import org.apache.bigtop.manager.grpc.generated.CommandType;
+import org.apache.bigtop.manager.grpc.generated.JobCacheReply;
+import org.apache.bigtop.manager.grpc.generated.JobCacheRequest;
+import org.apache.bigtop.manager.grpc.generated.JobCacheServiceGrpc;
+import org.apache.bigtop.manager.server.exception.ServerException;
+import org.apache.bigtop.manager.server.grpc.GrpcClient;
import org.apache.bigtop.manager.server.holder.SpringContextHolder;
import org.apache.bigtop.manager.server.model.converter.RepoConverter;
import org.apache.bigtop.manager.server.model.dto.ServiceDTO;
@@ -49,59 +51,72 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static
org.apache.bigtop.manager.common.constants.Constants.ALL_HOST_KEY;
-public class CacheFileUpdateTask extends AbstractTask {
+public class JobCacheHelper {
- private ClusterDao clusterDao;
- private ServiceConfigDao serviceConfigDao;
- private RepoDao repoDao;
- private HostDao hostDao;
- private ComponentDao componentDao;
+ private static ClusterDao clusterDao;
+ private static ServiceConfigDao serviceConfigDao;
+ private static RepoDao repoDao;
+ private static HostDao hostDao;
+ private static ComponentDao componentDao;
- private ClusterInfo clusterInfo;
- private Map<String, ComponentInfo> componentInfoMap;
- private Map<String, Map<String, Object>> serviceConfigMap;
- private Map<String, Set<String>> hostMap;
- private List<RepoInfo> repoList;
- private Map<String, String> userMap;
+ private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false);
- public CacheFileUpdateTask(TaskContext taskContext) {
- super(taskContext);
- }
-
- @Override
- protected void injectBeans() {
- super.injectBeans();
+ private static void initialize() {
+ clusterDao = SpringContextHolder.getBean(ClusterDao.class);
+ serviceConfigDao = SpringContextHolder.getBean(ServiceConfigDao.class);
+ repoDao = SpringContextHolder.getBean(RepoDao.class);
+ hostDao = SpringContextHolder.getBean(HostDao.class);
+ componentDao = SpringContextHolder.getBean(ComponentDao.class);
- this.clusterDao = SpringContextHolder.getBean(ClusterDao.class);
- this.serviceConfigDao =
SpringContextHolder.getBean(ServiceConfigDao.class);
- this.repoDao = SpringContextHolder.getBean(RepoDao.class);
- this.hostDao = SpringContextHolder.getBean(HostDao.class);
- this.componentDao = SpringContextHolder.getBean(ComponentDao.class);
+ INITIALIZED.set(true);
}
- @Override
- public void beforeRun() {
- super.beforeRun();
+ public static void sendJobCache(Long clusterId, Long jobId, List<String>
hostnames) {
+ CacheMessagePayload payload = genPayload(clusterId);
+ JobCacheRequest request = JobCacheRequest.newBuilder()
+ .setJobId(jobId)
+ .setPayload(JsonUtils.writeAsString(payload))
+ .build();
+ List<HostPO> hostPOList = hostDao.findAllByHostnames(hostnames);
+ List<CompletableFuture<Boolean>> futures = new ArrayList<>();
+ for (HostPO hostPO : hostPOList) {
+ futures.add(CompletableFuture.supplyAsync(() -> {
+ JobCacheServiceGrpc.JobCacheServiceBlockingStub stub =
GrpcClient.getBlockingStub(
+ hostPO.getHostname(),
+ hostPO.getGrpcPort(),
+ JobCacheServiceGrpc.JobCacheServiceBlockingStub.class);
+ JobCacheReply reply = stub.save(request);
+ return reply != null && reply.getCode() ==
MessageConstants.SUCCESS_CODE;
+ }));
+ }
- genCaches();
+ List<Boolean> results = futures.stream()
+ .map((future) -> {
+ try {
+ return future.get();
+ } catch (Exception e) {
+ return false;
+ }
+ })
+ .toList();
+
+ boolean allSuccess = results.stream().allMatch(Boolean::booleanValue);
+ if (!allSuccess) {
+ throw new ServerException("Failed to send job cache");
+ }
}
- private void genCaches() {
- if (taskContext.getClusterId() == null) {
- genEmptyCaches();
- } else {
- genFullCaches();
+ private static CacheMessagePayload genPayload(Long clusterId) {
+ if (!INITIALIZED.get()) {
+ initialize();
}
- }
- @SuppressWarnings("unchecked")
- private void genFullCaches() {
- Long clusterId = taskContext.getClusterId();
- List<String> hostnames = (List<String>)
taskContext.getProperties().get("hostnames");
ClusterPO clusterPO = clusterDao.findById(clusterId);
ComponentQuery componentQuery =
@@ -110,14 +125,14 @@ public class CacheFileUpdateTask extends AbstractTask {
List<ServiceConfigPO> serviceConfigPOList =
serviceConfigDao.findByClusterId(clusterPO.getId());
List<ComponentPO> componentPOList =
componentDao.findByQuery(componentQuery);
List<RepoPO> repoPOList = repoDao.findAll();
- List<HostPO> hostPOList = hostDao.findAllByHostnames(hostnames);
+ List<HostPO> hostPOList = hostDao.findAll();
- clusterInfo = new ClusterInfo();
+ ClusterInfo clusterInfo = new ClusterInfo();
clusterInfo.setName(clusterPO.getName());
clusterInfo.setUserGroup(clusterPO.getUserGroup());
clusterInfo.setRootDir(clusterPO.getRootDir());
- serviceConfigMap = new HashMap<>();
+ Map<String, Map<String, Object>> serviceConfigMap = new HashMap<>();
for (ServiceConfigPO serviceConfigPO : serviceConfigPOList) {
List<Map<String, Object>> properties =
JsonUtils.readFromString(serviceConfigPO.getPropertiesJson());
Map<String, String> kvMap = properties.stream()
@@ -134,7 +149,7 @@ public class CacheFileUpdateTask extends AbstractTask {
}
}
- hostMap = new HashMap<>();
+ Map<String, Set<String>> hostMap = new HashMap<>();
componentPOList.forEach(x -> {
if (hostMap.containsKey(x.getName())) {
hostMap.get(x.getName()).add(x.getHostname());
@@ -149,81 +164,25 @@ public class CacheFileUpdateTask extends AbstractTask {
Set<String> hostNameSet =
hostPOList.stream().map(HostPO::getHostname).collect(Collectors.toSet());
hostMap.put(ALL_HOST_KEY, hostNameSet);
- repoList = new ArrayList<>();
+ List<RepoInfo> repoList = new ArrayList<>();
repoPOList.forEach(repoPO -> {
RepoInfo repoInfo = RepoConverter.INSTANCE.fromPO2Message(repoPO);
repoList.add(repoInfo);
});
- userMap = new HashMap<>();
+ Map<String, String> userMap = new HashMap<>();
for (StackDTO stackDTO : StackUtils.getAllStacks()) {
for (ServiceDTO serviceDTO :
StackUtils.getServiceDTOList(stackDTO)) {
userMap.put(serviceDTO.getName(), serviceDTO.getUser());
}
}
- }
-
- @SuppressWarnings("unchecked")
- private void genEmptyCaches() {
- List<String> hostnames = (List<String>)
taskContext.getProperties().get("hostnames");
-
- List<RepoPO> repoPOList = repoDao.findAll();
- List<HostPO> hostPOList = hostDao.findAllByHostnames(hostnames);
-
- componentInfoMap = new HashMap<>();
- serviceConfigMap = new HashMap<>();
-
- clusterInfo = new ClusterInfo();
- clusterInfo.setUserGroup(taskContext.getUserGroup());
- clusterInfo.setRootDir(taskContext.getRootDir());
-
- hostMap = new HashMap<>();
- Set<String> hostNameSet =
hostPOList.stream().map(HostPO::getHostname).collect(Collectors.toSet());
- hostMap.put(ALL_HOST_KEY, hostNameSet);
-
- repoList = new ArrayList<>();
- repoPOList.forEach(repoPO -> {
- RepoInfo repoInfo = RepoConverter.INSTANCE.fromPO2Message(repoPO);
- repoList.add(repoInfo);
- });
-
- userMap = new HashMap<>();
- for (StackDTO stackDTO : StackUtils.getAllStacks()) {
- for (ServiceDTO serviceDTO :
StackUtils.getServiceDTOList(stackDTO)) {
- userMap.put(serviceDTO.getName(), serviceDTO.getUser());
- }
- }
- }
-
- @Override
- protected Command getCommand() {
- return Command.CUSTOM;
- }
-
- @Override
- protected String getCustomCommand() {
- return "update_cache_files";
- }
-
- @Override
- protected CommandRequest getCommandRequest() {
- CacheMessagePayload messagePayload = new CacheMessagePayload();
- messagePayload.setClusterInfo(clusterInfo);
- messagePayload.setConfigurations(serviceConfigMap);
- messagePayload.setClusterHostInfo(hostMap);
- messagePayload.setRepoInfo(repoList);
- messagePayload.setUserInfo(userMap);
- messagePayload.setComponentInfo(componentInfoMap);
-
- CommandRequest.Builder builder = CommandRequest.newBuilder();
- builder.setType(CommandType.UPDATE_CACHE_FILES);
- builder.setPayload(JsonUtils.writeAsString(messagePayload));
-
- return builder.build();
- }
- @Override
- public String getName() {
- return "Update cache files on " + taskContext.getHostname();
+ CacheMessagePayload payload = new CacheMessagePayload();
+ payload.setClusterInfo(clusterInfo);
+ payload.setConfigurations(serviceConfigMap);
+ payload.setClusterHostInfo(hostMap);
+ payload.setRepoInfo(repoList);
+ payload.setUserInfo(userMap);
+ return payload;
}
}
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java
index 1f72ba5d..02f07ecf 100644
---
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java
+++
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java
@@ -28,7 +28,7 @@ import org.apache.bigtop.manager.dao.repository.ClusterDao;
import org.apache.bigtop.manager.dao.repository.JobDao;
import org.apache.bigtop.manager.dao.repository.StageDao;
import org.apache.bigtop.manager.dao.repository.TaskDao;
-import org.apache.bigtop.manager.server.command.stage.CacheFileUpdateStage;
+import org.apache.bigtop.manager.server.command.helper.JobCacheHelper;
import org.apache.bigtop.manager.server.command.stage.Stage;
import org.apache.bigtop.manager.server.command.stage.StageContext;
import org.apache.bigtop.manager.server.command.task.Task;
@@ -84,11 +84,6 @@ public abstract class AbstractJob implements Job {
protected abstract void createStages();
- protected void createCacheStage() {
- StageContext stageContext =
StageContext.fromCommandDTO(jobContext.getCommandDTO());
- stages.add(new CacheFileUpdateStage(stageContext));
- }
-
@Override
public void beforeRun() {
jobPO.setState(JobState.PROCESSING.getName());
@@ -100,8 +95,18 @@ public abstract class AbstractJob implements Job {
boolean success = true;
try {
+ // Persist job state and required data.
beforeRun();
+ // Send job cache to agents
+ List<String> hostnames = stages.stream()
+ .map(Stage::getStageContext)
+ .map(StageContext::getHostnames)
+ .flatMap(List::stream)
+ .distinct()
+ .toList();
+ JobCacheHelper.sendJobCache(clusterPO.getId(), jobPO.getId(),
hostnames);
+
LinkedBlockingQueue<Stage> queue = new
LinkedBlockingQueue<>(stages);
while (!queue.isEmpty()) {
Stage stage = queue.poll();
@@ -152,12 +157,9 @@ public abstract class AbstractJob implements Job {
}
}
}
- if (!taskPOList.isEmpty()) {
- taskDao.partialUpdateByIds(taskPOList);
- }
- if (!stagePOList.isEmpty()) {
- stageDao.partialUpdateByIds(stagePOList);
- }
+
+ taskDao.partialUpdateByIds(taskPOList);
+ stageDao.partialUpdateByIds(stagePOList);
jobDao.partialUpdateById(jobPO);
}
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/cluster/ClusterAddJob.java
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/cluster/ClusterAddJob.java
index f8650456..0661b322 100644
---
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/cluster/ClusterAddJob.java
+++
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/cluster/ClusterAddJob.java
@@ -23,7 +23,6 @@ import org.apache.bigtop.manager.dao.po.StagePO;
import org.apache.bigtop.manager.dao.po.TaskPO;
import org.apache.bigtop.manager.server.command.job.AbstractJob;
import org.apache.bigtop.manager.server.command.job.JobContext;
-import org.apache.bigtop.manager.server.command.stage.CacheFileUpdateStage;
import org.apache.bigtop.manager.server.command.stage.HostCheckStage;
import org.apache.bigtop.manager.server.command.stage.SetupJdkStage;
import org.apache.bigtop.manager.server.command.stage.Stage;
@@ -54,11 +53,22 @@ public class ClusterAddJob extends AbstractJob {
hostService = SpringContextHolder.getBean(HostService.class);
}
+ @Override
+ protected void beforeCreateStages() {
+ super.beforeCreateStages();
+
+ if (jobContext.getRetryFlag()) {
+ // Cluster already created, but command still doesn't have cluster
id
+ // So we need to find the cluster by name
+ String clusterName =
jobContext.getCommandDTO().getClusterCommand().getName();
+ clusterPO = clusterDao.findByName(clusterName);
+ }
+ }
+
@Override
protected void createStages() {
StageContext stageContext =
StageContext.fromCommandDTO(jobContext.getCommandDTO());
stages.add(new HostCheckStage(stageContext));
- stages.add(new CacheFileUpdateStage(stageContext));
stages.add(new SetupJdkStage(stageContext));
}
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentAddJob.java
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentAddJob.java
index c95fd60b..99a3fd01 100644
---
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentAddJob.java
+++
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentAddJob.java
@@ -49,9 +49,6 @@ public class ComponentAddJob extends AbstractComponentJob {
@Override
protected void createStages() {
- // Update cache files
- super.createCacheStage();
-
CommandDTO commandDTO = jobContext.getCommandDTO();
Map<String, List<String>> componentHostsMap = getComponentHostsMap();
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/host/HostAddJob.java
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/host/HostAddJob.java
index a10f240a..ba045d31 100644
---
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/host/HostAddJob.java
+++
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/host/HostAddJob.java
@@ -19,7 +19,6 @@
package org.apache.bigtop.manager.server.command.job.host;
import org.apache.bigtop.manager.server.command.job.JobContext;
-import org.apache.bigtop.manager.server.command.stage.CacheFileUpdateStage;
import org.apache.bigtop.manager.server.command.stage.HostCheckStage;
import org.apache.bigtop.manager.server.command.stage.SetupJdkStage;
import org.apache.bigtop.manager.server.command.stage.StageContext;
@@ -50,7 +49,6 @@ public class HostAddJob extends AbstractHostJob {
protected void createStages() {
StageContext stageContext =
StageContext.fromCommandDTO(jobContext.getCommandDTO());
stages.add(new HostCheckStage(stageContext));
- stages.add(new CacheFileUpdateStage(stageContext));
stages.add(new SetupJdkStage(stageContext));
}
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceAddJob.java
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceAddJob.java
index 690f3542..fb4ba777 100644
---
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceAddJob.java
+++
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceAddJob.java
@@ -58,9 +58,6 @@ public class ServiceAddJob extends AbstractServiceJob {
@Override
protected void createStages() {
- // Update cache files
- super.createCacheStage();
-
CommandDTO commandDTO = jobContext.getCommandDTO();
Map<String, List<String>> componentHostsMap = getComponentHostsMap();
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceConfigureJob.java
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceConfigureJob.java
index f6325685..93c61bba 100644
---
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceConfigureJob.java
+++
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceConfigureJob.java
@@ -34,9 +34,6 @@ public class ServiceConfigureJob extends AbstractServiceJob {
@Override
protected void createStages() {
- // Update cache files
- super.createCacheStage();
-
CommandDTO commandDTO = jobContext.getCommandDTO();
Map<String, List<String>> componentHostsMap = getComponentHostsMap();
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/CacheFileUpdateStage.java
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/CacheFileUpdateStage.java
deleted file mode 100644
index 88c57081..00000000
---
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/CacheFileUpdateStage.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.bigtop.manager.server.command.stage;
-
-import org.apache.bigtop.manager.dao.po.HostPO;
-import org.apache.bigtop.manager.server.command.task.CacheFileUpdateTask;
-import org.apache.bigtop.manager.server.command.task.Task;
-import org.apache.bigtop.manager.server.command.task.TaskContext;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class CacheFileUpdateStage extends AbstractStage {
-
- public CacheFileUpdateStage(StageContext stageContext) {
- super(stageContext);
- }
-
- @Override
- protected void injectBeans() {
- super.injectBeans();
- }
-
- @Override
- protected void beforeCreateTasks() {
- List<String> hostnames = new ArrayList<>();
-
- if (stageContext.getClusterId() == null) {
- hostnames.addAll(stageContext.getHostnames() == null ? List.of() :
stageContext.getHostnames());
- } else {
- hostnames.addAll(stageContext.getHostnames() == null ? List.of() :
stageContext.getHostnames());
-
hostnames.addAll(hostDao.findAllByClusterId(stageContext.getClusterId()).stream()
- .map(HostPO::getHostname)
- .toList());
- }
-
- stageContext.setHostnames(hostnames);
- }
-
- @Override
- protected Task createTask(String hostname) {
- TaskContext taskContext = new TaskContext();
- taskContext.setHostname(hostname);
- taskContext.setClusterId(stageContext.getClusterId());
- taskContext.setClusterName(stageContext.getClusterName());
- taskContext.setUserGroup(stageContext.getUserGroup());
- taskContext.setRootDir(stageContext.getRootDir());
- taskContext.setServiceName("cluster");
- taskContext.setServiceUser("root");
- taskContext.setComponentName("agent");
- taskContext.setComponentDisplayName("Agent");
-
- Map<String, Object> properties = new HashMap<>();
- properties.put("hostnames", stageContext.getHostnames());
- taskContext.setProperties(properties);
-
- return new CacheFileUpdateTask(taskContext);
- }
-
- @Override
- public String getName() {
- return "Update cache files";
- }
-}