This is an automated email from the ASF dual-hosted git repository. aonishuk pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push: new 88e7148 AMBARI-24416. Component Versions Are Not Reported On Initial Status Commands Anymore (aonishuk) 88e7148 is described below commit 88e7148b192a37a131b0a11343b06de8cb5d8095 Author: Andrew Onishuk <aonis...@hortonworks.com> AuthorDate: Tue Aug 7 22:27:47 2018 +0300 AMBARI-24416. Component Versions Are Not Reported On Initial Status Commands Anymore (aonishuk) --- .../ambari_agent/ComponentVersionReporter.py | 109 +++++++++++++++++++++ .../src/main/python/ambari_agent/Constants.py | 1 + .../ambari_agent/CustomServiceOrchestrator.py | 21 ++-- .../main/python/ambari_agent/HeartbeatThread.py | 7 ++ .../main/python/ambari_agent/models/commands.py | 1 + .../resource_management/libraries/script/script.py | 9 +- .../apache/ambari/server/agent/AgentReport.java | 36 +++---- .../ambari/server/agent/AgentReportsProcessor.java | 14 +-- ...ntReport.java => CommandStatusAgentReport.java} | 35 ++----- .../server/agent/ComponentStatusAgentReport.java | 36 +++++++ .../server/agent/ComponentVersionAgentReport.java | 36 +++++++ .../ambari/server/agent/HeartBeatHandler.java | 5 + .../ambari/server/agent/HeartbeatProcessor.java | 73 ++++++++++++-- ...AgentReport.java => HostStatusAgentReport.java} | 36 ++----- .../server/agent/stomp/AgentReportsController.java | 27 +++-- .../agent/stomp/dto/ComponentVersionReport.java | 68 +++++++++++++ .../agent/stomp/dto/ComponentVersionReports.java | 45 +++++++++ 17 files changed, 443 insertions(+), 116 deletions(-) diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentVersionReporter.py b/ambari-agent/src/main/python/ambari_agent/ComponentVersionReporter.py new file mode 100644 index 0000000..eb4358b --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/ComponentVersionReporter.py @@ -0,0 +1,109 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import logging +import threading + +from ambari_agent import Constants +from collections import defaultdict + +from ambari_agent.models.commands import AgentCommand + +logger = logging.getLogger(__name__) + +class ComponentVersionReporter(threading.Thread): + def __init__(self, initializer_module): + self.initializer_module = initializer_module + self.topology_cache = initializer_module.topology_cache + self.customServiceOrchestrator = initializer_module.customServiceOrchestrator + self.server_responses_listener = initializer_module.server_responses_listener + threading.Thread.__init__(self) + + def run(self): + """ + Get version of all components by running get_version execution command. + """ + try: + cluster_reports = defaultdict(lambda:[]) + + for cluster_id in self.topology_cache.get_cluster_ids(): + topology_cache = self.topology_cache[cluster_id] + + if 'components' not in topology_cache: + continue + + current_host_id = self.topology_cache.get_current_host_id(cluster_id) + + if current_host_id is None: + continue + + cluster_components = topology_cache.components + for component_dict in cluster_components: + # check if component is installed on current host + if current_host_id not in component_dict.hostIds: + continue + + service_name = component_dict.serviceName + component_name = component_dict.componentName + + result = self.check_component_version(cluster_id, service_name, component_name) + + if result: + cluster_reports[cluster_id].append(result) + + self.send_updates_to_server(cluster_reports) + except: + logger.exception("Exception in ComponentVersionReporter") + + def check_component_version(self, cluster_id, service_name, component_name): + """ + Returns components version + """ + # if not a component + if self.topology_cache.get_component_info_by_key(cluster_id, service_name, component_name) is None: + return None + + command_dict = { + 'serviceName': service_name, + 'role': component_name, + 'clusterId': cluster_id, + 'commandType': AgentCommand.get_version, + } + + version_result = self.customServiceOrchestrator.requestComponentStatus(command_dict, command_name=AgentCommand.get_version) + + if version_result['exitcode'] or not 'structuredOut' in version_result or not 'version' in version_result['structuredOut']: + logger.error("Could not get version for component {0} of {1} service cluster_id={2}. Command returned: {3}".format(component_name, service_name, cluster_id, version_result)) + return None + + # TODO: check if no strout or version if not there + + result = { + 'serviceName': service_name, + 'componentName': component_name, + 'version': version_result['structuredOut']['version'], + 'clusterId': cluster_id, + } + + return result + + def send_updates_to_server(self, cluster_reports): + if not cluster_reports or not self.initializer_module.is_registered: + return + + self.initializer_module.connection.send(message={'clusters': cluster_reports}, destination=Constants.COMPONENT_VERSION_REPORTS_ENDPOINT) diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py index ed6b482..91141a6 100644 --- a/ambari-agent/src/main/python/ambari_agent/Constants.py +++ b/ambari-agent/src/main/python/ambari_agent/Constants.py @@ -38,6 +38,7 @@ CONFIGURATIONS_REQUEST_ENDPOINT = '/agents/configs' HOST_LEVEL_PARAMS_TOPIC_ENPOINT = '/agents/host_level_params' ALERTS_DEFINITIONS_REQUEST_ENDPOINT = '/agents/alert_definitions' COMPONENT_STATUS_REPORTS_ENDPOINT = '/reports/component_status' +COMPONENT_VERSION_REPORTS_ENDPOINT = '/reports/component_version' COMMANDS_STATUS_REPORTS_ENDPOINT = '/reports/commands_status' HOST_STATUS_REPORTS_ENDPOINT = '/reports/host_status' ALERTS_STATUS_REPORTS_ENDPOINT = '/reports/alerts_status' diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py index 1dd4fa0..41f18e5 100644 --- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py +++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py @@ -93,6 +93,8 @@ class CustomServiceOrchestrator(object): 'status_command_stdout_{0}.txt') self.status_commands_stderr = os.path.join(self.tmp_dir, 'status_command_stderr_{0}.txt') + self.status_structured_out = os.path.join(self.tmp_dir, + 'status_structured-out-{0}.json') # Construct the hadoop credential lib JARs path self.credential_shell_lib_path = os.path.join(self.config.get('security', 'credential_lib_dir', @@ -307,7 +309,7 @@ class CustomServiceOrchestrator(object): return cmd_result def runCommand(self, command_header, tmpoutfile, tmperrfile, forced_command_name=None, - override_output_files=True, retry=False, is_status_command=False): + override_output_files=True, retry=False, is_status_command=False, tmpstrucoutfile=None): """ forced_command_name may be specified manually. In this case, value, defined at command json, is ignored. @@ -348,7 +350,8 @@ class CustomServiceOrchestrator(object): script_path = self.resolve_script_path(base_dir, script) script_tuple = (script_path, base_dir) - tmpstrucoutfile = os.path.join(self.tmp_dir, "structured-out-{0}.json".format(task_id)) + if not tmpstrucoutfile: + tmpstrucoutfile = os.path.join(self.tmp_dir, "structured-out-{0}.json".format(task_id)) # We don't support anything else yet if script_type.upper() != self.SCRIPT_TYPE_PYTHON: @@ -374,7 +377,7 @@ class CustomServiceOrchestrator(object): else: logger.info("Skipping generation of jceks files as this is a retry of the command") - json_path = self.dump_command_to_json(command, retry) + json_path = self.dump_command_to_json(command, retry, is_status_command) hooks = self.hooks_orchestrator.resolve_hooks(command, command_name) """:type hooks ambari_agent.CommandHooksOrchestrator.ResolvedHooks""" @@ -508,7 +511,7 @@ class CustomServiceOrchestrator(object): return command - def requestComponentStatus(self, command_header): + def requestComponentStatus(self, command_header, command_name="STATUS"): """ Component status is determined by exit code, returned by runCommand(). Exit code 0 means that component is running and any other exit code means that @@ -521,11 +524,13 @@ class CustomServiceOrchestrator(object): # make sure status commands that run in parallel don't use the same files status_commands_stdout = self.status_commands_stdout.format(uuid.uuid4()) status_commands_stderr = self.status_commands_stderr.format(uuid.uuid4()) + status_structured_out = self.status_structured_out.format(uuid.uuid4()) try: res = self.runCommand(command_header, status_commands_stdout, - status_commands_stderr, self.COMMAND_NAME_STATUS, - override_output_files=override_output_files, is_status_command=True) + status_commands_stderr, command_name, + override_output_files=override_output_files, is_status_command=True, + tmpstrucoutfile=status_structured_out) finally: try: os.unlink(status_commands_stdout) @@ -545,14 +550,14 @@ class CustomServiceOrchestrator(object): raise AgentException(message) return path - def dump_command_to_json(self, command, retry=False): + def dump_command_to_json(self, command, retry=False, is_status_command=False): """ Converts command to json file and returns file path """ # Now, dump the json file command_type = command['commandType'] - if command_type == AgentCommand.status: + if is_status_command: # make sure status commands that run in parallel don't use the same files file_path = os.path.join(self.tmp_dir, "status_command_{0}.json".format(uuid.uuid4())) else: diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py index 3403bb2..2d4e06b 100644 --- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py +++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py @@ -27,6 +27,7 @@ from ambari_agent import Constants from ambari_agent.Register import Register from ambari_agent.Utils import BlockingDictionary from ambari_agent.Utils import Utils +from ambari_agent.ComponentVersionReporter import ComponentVersionReporter from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListener from ambari_agent.listeners.TopologyEventListener import TopologyEventListener from ambari_agent.listeners.ConfigurationEventListener import ConfigurationEventListener @@ -146,14 +147,20 @@ class HeartbeatThread(threading.Thread): self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE) self.run_post_registration_actions() + self.initializer_module.is_registered = True # now when registration is done we can expose connection to other threads. self.initializer_module._connection = self.connection + self.report_components_initial_versions() + def run_post_registration_actions(self): for post_registration_action in self.post_registration_actions: post_registration_action() + def report_components_initial_versions(self): + ComponentVersionReporter(self.initializer_module).start() + def unregister(self): """ Disconnect and remove connection object from initializer_module so other threads cannot use it diff --git a/ambari-agent/src/main/python/ambari_agent/models/commands.py b/ambari-agent/src/main/python/ambari_agent/models/commands.py index eb96e9a..e9ea2c8 100644 --- a/ambari-agent/src/main/python/ambari_agent/models/commands.py +++ b/ambari-agent/src/main/python/ambari_agent/models/commands.py @@ -19,6 +19,7 @@ limitations under the License. class AgentCommand(object): status = "STATUS_COMMAND" + get_version = "GET_VERSION" execution = "EXECUTION_COMMAND" auto_execution = "AUTO_EXECUTION_COMMAND" background_execution = "BACKGROUND_EXECUTION_COMMAND" diff --git a/ambari-common/src/main/python/resource_management/libraries/script/script.py b/ambari-common/src/main/python/resource_management/libraries/script/script.py index 556412a..52287a4 100644 --- a/ambari-common/src/main/python/resource_management/libraries/script/script.py +++ b/ambari-common/src/main/python/resource_management/libraries/script/script.py @@ -260,10 +260,8 @@ class Script(object): stack_version_unformatted = str(default("/clusterLevelParams/stack_version", "")) stack_version_formatted = format_stack_version(stack_version_unformatted) if stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted): - if command_name.lower() == "status": - request_version = default("/commandParams/request_version", None) - if request_version is not None: - return True + if command_name.lower() == "get_version": + return True else: # Populate version only on base commands return command_name.lower() == "start" or command_name.lower() == "install" or command_name.lower() == "restart" @@ -362,6 +360,9 @@ class Script(object): if self.should_expose_component_version(self.command_name): self.save_component_version_to_structured_out(self.command_name) + def get_version(self, env): + pass + def execute_prefix_function(self, command_name, afix, env): """ Execute action afix (prefix or suffix) based on command_name and afix type diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java index 817a238..1ec7028 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,37 +17,25 @@ */ package org.apache.ambari.server.agent; -import java.util.List; +import org.apache.ambari.server.AmbariException; -import org.apache.ambari.server.agent.stomp.dto.HostStatusReport; +public abstract class AgentReport<R> { -public class AgentReport { + private final String hostName; + private final R report; - private String hostName; - private List<ComponentStatus> componentStatuses; - private List<CommandReport> reports; - private HostStatusReport hostStatusReport; - - public AgentReport(String hostName, List<ComponentStatus> componentStatuses, List<CommandReport> reports, HostStatusReport hostStatusReport) { + public AgentReport(String hostName, R report) { this.hostName = hostName; - this.componentStatuses = componentStatuses; - this.reports = reports; - this.hostStatusReport = hostStatusReport; + this.report = report; } public String getHostName() { return hostName; } - public List<ComponentStatus> getComponentStatuses() { - return componentStatuses; - } - - public List<CommandReport> getCommandReports() { - return reports; + public final void process() throws AmbariException { + process(report, hostName); } - public HostStatusReport getHostStatusReport() { - return hostStatusReport; - } + protected abstract void process(R report, String hostName) throws AmbariException; } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java index ad5c6aa..7a2ec3a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java @@ -49,9 +49,6 @@ public class AgentReportsProcessor { } @Inject - private HeartBeatHandler hh; - - @Inject private UnitOfWork unitOfWork; @Inject @@ -77,17 +74,8 @@ public class AgentReportsProcessor { public void run() { try { unitOfWork.begin(); - String hostName = agentReport.getHostName(); try { - - //TODO rewrite with polymorphism usage. - if (agentReport.getCommandReports() != null) { - hh.handleCommandReportStatus(agentReport.getCommandReports(), hostName); - } else if (agentReport.getComponentStatuses() != null) { - hh.handleComponentReportStatus(agentReport.getComponentStatuses(), hostName); - } else if (agentReport.getHostStatusReport() != null) { - hh.handleHostReportStatus(agentReport.getHostStatusReport(), hostName); - } + agentReport.process(); } catch (AmbariException e) { LOG.error("Error processing agent reports", e); } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandStatusAgentReport.java similarity index 50% copy from ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java copy to ambari-server/src/main/java/org/apache/ambari/server/agent/CommandStatusAgentReport.java index 817a238..04587ae 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandStatusAgentReport.java @@ -19,35 +19,18 @@ package org.apache.ambari.server.agent; import java.util.List; -import org.apache.ambari.server.agent.stomp.dto.HostStatusReport; +import org.apache.ambari.server.AmbariException; -public class AgentReport { +public class CommandStatusAgentReport extends AgentReport<List<CommandReport>> { + private final HeartBeatHandler hh; - private String hostName; - private List<ComponentStatus> componentStatuses; - private List<CommandReport> reports; - private HostStatusReport hostStatusReport; - - public AgentReport(String hostName, List<ComponentStatus> componentStatuses, List<CommandReport> reports, HostStatusReport hostStatusReport) { - this.hostName = hostName; - this.componentStatuses = componentStatuses; - this.reports = reports; - this.hostStatusReport = hostStatusReport; - } - - public String getHostName() { - return hostName; - } - - public List<ComponentStatus> getComponentStatuses() { - return componentStatuses; - } - - public List<CommandReport> getCommandReports() { - return reports; + public CommandStatusAgentReport(HeartBeatHandler hh, String hostName, List<CommandReport> commandReports) { + super(hostName, commandReports); + this.hh = hh; } - public HostStatusReport getHostStatusReport() { - return hostStatusReport; + @Override + protected void process(List<CommandReport> report, String hostName) throws AmbariException { + hh.handleCommandReportStatus(report, hostName); } } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatusAgentReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatusAgentReport.java new file mode 100644 index 0000000..e6a3813 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatusAgentReport.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.agent; + +import java.util.List; + +import org.apache.ambari.server.AmbariException; + +public class ComponentStatusAgentReport extends AgentReport<List<ComponentStatus>> { + private final HeartBeatHandler hh; + + public ComponentStatusAgentReport(HeartBeatHandler hh, String hostName, List<ComponentStatus> componentStatuses) { + super(hostName, componentStatuses); + this.hh = hh; + } + + @Override + protected void process(List<ComponentStatus> report, String hostName) throws AmbariException { + hh.handleComponentReportStatus(report, hostName); + } +} diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentVersionAgentReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentVersionAgentReport.java new file mode 100644 index 0000000..405d34a --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentVersionAgentReport.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.agent; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.agent.stomp.dto.ComponentVersionReports; + +public class ComponentVersionAgentReport extends AgentReport<ComponentVersionReports> { + private final HeartBeatHandler hh; + + public ComponentVersionAgentReport(HeartBeatHandler hh, String hostName, + ComponentVersionReports componentVersionReports) { + super(hostName, componentVersionReports); + this.hh = hh; + } + + @Override + protected void process(ComponentVersionReports report, String hostName) throws AmbariException { + hh.handleComponentVersionReports(report, hostName); + } +} diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java index 7d70390..1c225a9 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java @@ -26,6 +26,7 @@ import java.util.regex.Pattern; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.HostNotFoundException; import org.apache.ambari.server.actionmanager.ActionManager; +import org.apache.ambari.server.agent.stomp.dto.ComponentVersionReports; import org.apache.ambari.server.agent.stomp.dto.HostStatusReport; import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.configuration.Configuration; @@ -256,6 +257,10 @@ public class HeartBeatHandler { } } + public void handleComponentVersionReports(ComponentVersionReports componentVersionReports, String hostname) throws AmbariException { + heartbeatProcessor.processVersionReports(componentVersionReports, hostname); + } + protected void processRecoveryReport(RecoveryReport recoveryReport, String hostname) throws AmbariException { LOG.debug("Received recovery report: {}", recoveryReport); Host host = clusterFsm.getHost(hostname); diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java index ffbf3f3..e6b1937 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java @@ -41,6 +41,8 @@ import org.apache.ambari.server.actionmanager.ActionManager; import org.apache.ambari.server.actionmanager.HostRoleCommand; import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.agent.ExecutionCommand.KeyNames; +import org.apache.ambari.server.agent.stomp.dto.ComponentVersionReport; +import org.apache.ambari.server.agent.stomp.dto.ComponentVersionReports; import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.controller.MaintenanceStateHelper; import org.apache.ambari.server.events.ActionFinalReportReceivedEvent; @@ -550,6 +552,70 @@ public class HeartbeatProcessor extends AbstractService{ } /** + * Process reports of components versions + * @throws AmbariException + */ + public void processVersionReports(ComponentVersionReports versionReports, String hostname) throws AmbariException { + Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname); + for (Cluster cl : clusters) { + for (Map.Entry<String, List<ComponentVersionReport>> status : versionReports + .getComponentVersionReports().entrySet()) { + if (Long.valueOf(status.getKey()).equals(cl.getClusterId())) { + for (ComponentVersionReport versionReport : status.getValue()) { + try { + Service svc = cl.getService(versionReport.getServiceName()); + + String componentName = versionReport.getComponentName(); + if (svc.getServiceComponents().containsKey(componentName)) { + ServiceComponent svcComp = svc.getServiceComponent( + componentName); + ServiceComponentHost scHost = svcComp.getServiceComponentHost( + hostname); + + String version = versionReport.getVersion(); + + HostComponentVersionAdvertisedEvent event = new HostComponentVersionAdvertisedEvent(cl, + scHost, version); + versionEventPublisher.publish(event); + } + } catch (ServiceNotFoundException e) { + LOG.warn("Received a version report for a non-initialized" + + " service" + + ", clusterId=" + versionReport.getClusterId() + + ", serviceName=" + versionReport.getServiceName()); + continue; + } catch (ServiceComponentNotFoundException e) { + LOG.warn("Received a version report for a non-initialized" + + " servicecomponent" + + ", clusterId=" + versionReport.getClusterId() + + ", serviceName=" + versionReport.getServiceName() + + ", componentName=" + versionReport.getComponentName()); + continue; + } catch (ServiceComponentHostNotFoundException e) { + LOG.warn("Received a version report for a non-initialized" + + " hostcomponent" + + ", clusterId=" + versionReport.getClusterId() + + ", serviceName=" + versionReport.getServiceName() + + ", componentName=" + versionReport.getComponentName() + + ", hostname=" + hostname); + continue; + } catch (RuntimeException e) { + LOG.warn("Received a version report with invalid payload" + + " service" + + ", clusterId=" + versionReport.getClusterId() + + ", serviceName=" + versionReport.getServiceName() + + ", componentName=" + versionReport.getComponentName() + + ", hostname=" + hostname + + ", error=" + e.getMessage()); + continue; + } + } + } + } + } + } + + /** * Process reports of status commands * @throws AmbariException */ @@ -596,13 +662,6 @@ public class HeartbeatProcessor extends AbstractService{ List<Map<String, String>> list = (List<Map<String, String>>) extra.get("processes"); scHost.setProcesses(list); } - if (extra.containsKey("version")) { - String version = extra.get("version").toString(); - - HostComponentVersionAdvertisedEvent event = new HostComponentVersionAdvertisedEvent(cl, scHost, version); - versionEventPublisher.publish(event); - } - } catch (Exception e) { LOG.error("Could not access extra JSON for " + scHost.getServiceComponentName() + " from " + diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HostStatusAgentReport.java similarity index 53% copy from ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java copy to ambari-server/src/main/java/org/apache/ambari/server/agent/HostStatusAgentReport.java index 817a238..a85f961 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HostStatusAgentReport.java @@ -17,37 +17,19 @@ */ package org.apache.ambari.server.agent; -import java.util.List; - +import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.agent.stomp.dto.HostStatusReport; -public class AgentReport { - - private String hostName; - private List<ComponentStatus> componentStatuses; - private List<CommandReport> reports; - private HostStatusReport hostStatusReport; - - public AgentReport(String hostName, List<ComponentStatus> componentStatuses, List<CommandReport> reports, HostStatusReport hostStatusReport) { - this.hostName = hostName; - this.componentStatuses = componentStatuses; - this.reports = reports; - this.hostStatusReport = hostStatusReport; - } - - public String getHostName() { - return hostName; - } - - public List<ComponentStatus> getComponentStatuses() { - return componentStatuses; - } +public class HostStatusAgentReport extends AgentReport<HostStatusReport> { + private final HeartBeatHandler hh; - public List<CommandReport> getCommandReports() { - return reports; + public HostStatusAgentReport(HeartBeatHandler hh, String hostName, HostStatusReport hostStatusReport) { + super(hostName, hostStatusReport); + this.hh = hh; } - public HostStatusReport getHostStatusReport() { - return hostStatusReport; + @Override + protected void process(HostStatusReport report, String hostName) throws AmbariException { + hh.handleHostReportStatus(report, hostName); } } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java index 82ddb1c..022f7a0 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java @@ -27,16 +27,20 @@ import javax.ws.rs.WebApplicationException; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.HostNotRegisteredException; -import org.apache.ambari.server.agent.AgentReport; import org.apache.ambari.server.agent.AgentReportsProcessor; import org.apache.ambari.server.agent.AgentSessionManager; import org.apache.ambari.server.agent.CommandReport; +import org.apache.ambari.server.agent.CommandStatusAgentReport; import org.apache.ambari.server.agent.ComponentStatus; +import org.apache.ambari.server.agent.ComponentStatusAgentReport; +import org.apache.ambari.server.agent.ComponentVersionAgentReport; import org.apache.ambari.server.agent.HeartBeatHandler; +import org.apache.ambari.server.agent.HostStatusAgentReport; import org.apache.ambari.server.agent.stomp.dto.AckReport; import org.apache.ambari.server.agent.stomp.dto.CommandStatusReports; import org.apache.ambari.server.agent.stomp.dto.ComponentStatusReport; import org.apache.ambari.server.agent.stomp.dto.ComponentStatusReports; +import org.apache.ambari.server.agent.stomp.dto.ComponentVersionReports; import org.apache.ambari.server.agent.stomp.dto.HostStatusReport; import org.apache.ambari.server.events.DefaultMessageEmitter; import org.apache.ambari.server.state.Alert; @@ -70,6 +74,15 @@ public class AgentReportsController { agentReportsProcessor = injector.getInstance(AgentReportsProcessor.class); } + @MessageMapping("/component_version") + public ReportsResponse handleComponentVersionReport(@Header String simpSessionId, ComponentVersionReports message) + throws WebApplicationException, InvalidStateTransitionException, AmbariException { + + agentReportsProcessor.addAgentReport(new ComponentVersionAgentReport(hh, + agentSessionManager.getHost(simpSessionId).getHostName(), message)); + return new ReportsResponse(); + } + @MessageMapping("/component_status") public ReportsResponse handleComponentReportStatus(@Header String simpSessionId, ComponentStatusReports message) throws WebApplicationException, InvalidStateTransitionException, AmbariException { @@ -85,8 +98,8 @@ public class AgentReportsController { } } - agentReportsProcessor.addAgentReport(new AgentReport(agentSessionManager.getHost(simpSessionId).getHostName(), - statuses, null, null)); + agentReportsProcessor.addAgentReport(new ComponentStatusAgentReport(hh, + agentSessionManager.getHost(simpSessionId).getHostName(), statuses)); return new ReportsResponse(); } @@ -98,15 +111,15 @@ public class AgentReportsController { statuses.addAll(clusterReport.getValue()); } - agentReportsProcessor.addAgentReport(new AgentReport(agentSessionManager.getHost(simpSessionId).getHostName(), - null, statuses, null)); + agentReportsProcessor.addAgentReport(new CommandStatusAgentReport(hh, + agentSessionManager.getHost(simpSessionId).getHostName(), statuses)); return new ReportsResponse(); } @MessageMapping("/host_status") public ReportsResponse handleHostReportStatus(@Header String simpSessionId, HostStatusReport message) throws AmbariException { - agentReportsProcessor.addAgentReport(new AgentReport(agentSessionManager.getHost(simpSessionId).getHostName(), - null, null, message)); + agentReportsProcessor.addAgentReport(new HostStatusAgentReport(hh, + agentSessionManager.getHost(simpSessionId).getHostName(), message)); return new ReportsResponse(); } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentVersionReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentVersionReport.java new file mode 100644 index 0000000..6619096 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentVersionReport.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.agent.stomp.dto; + +public class ComponentVersionReport { + private String componentName; + private String serviceName; + private String version; + private Long clusterId; + + public ComponentVersionReport() { + } + + public ComponentVersionReport(String componentName, String serviceName, String version, Long clusterId) { + this.componentName = componentName; + this.serviceName = serviceName; + this.version = version; + this.clusterId = clusterId; + } + + public String getComponentName() { + return componentName; + } + + public void setComponentName(String componentName) { + this.componentName = componentName; + } + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public Long getClusterId() { + return clusterId; + } + + public void setClusterId(Long clusterId) { + this.clusterId = clusterId; + } +} diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentVersionReports.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentVersionReports.java new file mode 100644 index 0000000..21e4210 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentVersionReports.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.agent.stomp.dto; + +import java.util.List; +import java.util.TreeMap; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class ComponentVersionReports { + + @JsonProperty("clusters") + private TreeMap<String, List<ComponentVersionReport>> componentVersionReports; + + public ComponentVersionReports() { + } + + public ComponentVersionReports(TreeMap<String, List<ComponentVersionReport>> componentVersionReports) { + this.componentVersionReports = componentVersionReports; + } + + public TreeMap<String, List<ComponentVersionReport>> getComponentVersionReports() { + return componentVersionReports; + } + + public void setComponentVersionReports(TreeMap<String, List<ComponentVersionReport>> componentVersionReports) { + this.componentVersionReports = componentVersionReports; + } +}