AMBARI-22063. Poor performance of STOMP subscriptions cache and registration handling. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6ecac18c Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6ecac18c Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6ecac18c Branch: refs/heads/branch-3.0-perf Commit: 6ecac18cb5668ec3b74fe69751dc54d65f33f698 Parents: f0def7c Author: Myroslav Papirkovskyi <[email protected]> Authored: Thu Oct 5 13:45:13 2017 +0300 Committer: Myroslav Papirkovskyi <[email protected]> Committed: Thu Oct 5 13:45:13 2017 +0300 ---------------------------------------------------------------------- .../server/HostNotRegisteredException.java | 4 +- .../server/actionmanager/ActionScheduler.java | 20 +- .../apache/ambari/server/agent/AgentReport.java | 53 ++ .../server/agent/AgentReportsProcessor.java | 100 ++++ .../server/agent/AgentSessionManager.java | 20 +- .../ambari/server/agent/HeartBeatHandler.java | 121 +---- .../ambari/server/agent/HeartbeatMonitor.java | 2 + .../ambari/server/agent/HeartbeatProcessor.java | 49 +- .../agent/stomp/AgentClusterDataHolder.java | 27 +- .../server/agent/stomp/AgentConfigsHolder.java | 20 +- .../agent/stomp/AgentCurrentDataController.java | 19 +- .../server/agent/stomp/AgentHostDataHolder.java | 40 +- .../agent/stomp/AgentReportsController.java | 24 +- .../agent/stomp/AgentsRegistrationQueue.java | 76 +++ .../agent/stomp/AlertDefinitionsHolder.java | 40 +- .../agent/stomp/AmbariSubscriptionRegistry.java | 536 +++++++++++++++++++ .../server/agent/stomp/HeartbeatController.java | 150 ++++-- .../agent/stomp/HostLevelParamsHolder.java | 10 +- .../server/agent/stomp/MetadataHolder.java | 6 +- .../server/agent/stomp/dto/ClusterConfigs.java | 16 +- .../ambari/server/agent/stomp/dto/Hash.java | 6 +- .../server/agent/stomp/dto/MetadataCluster.java | 6 +- .../agent/stomp/dto/MetadataServiceInfo.java | 6 +- .../server/agent/stomp/dto/TopologyCluster.java | 6 +- .../agent/stomp/dto/TopologyComponent.java | 6 +- .../server/agent/stomp/dto/TopologyHost.java | 6 +- .../server/configuration/Configuration.java | 86 +++ .../spring/AgentRegisteringQueueChecker.java | 55 ++ .../configuration/spring/AgentStompConfig.java | 7 +- .../configuration/spring/GuiceBeansConfig.java | 12 + .../configuration/spring/RootStompConfig.java | 35 +- .../controller/AmbariManagementController.java | 3 + .../AmbariManagementControllerImpl.java | 49 +- .../ambari/server/controller/AmbariServer.java | 13 +- .../internal/HostResourceProvider.java | 8 +- .../state/DefaultServiceCalculatedState.java | 2 +- .../state/FlumeServiceCalculatedState.java | 2 +- .../state/HBaseServiceCalculatedState.java | 2 +- .../state/HDFSServiceCalculatedState.java | 2 +- .../state/HiveServiceCalculatedState.java | 2 +- .../state/OozieServiceCalculatedState.java | 2 +- .../state/YARNServiceCalculatedState.java | 2 +- .../server/events/AgentConfigsUpdateEvent.java | 20 +- .../events/AlertDefinitionsUpdateEvent.java | 12 +- .../server/events/AmbariHostUpdateEvent.java | 6 +- .../server/events/ConfigsUpdateEvent.java | 39 -- .../server/events/ExecutionCommandEvent.java | 26 +- .../events/HostLevelParamsUpdateEvent.java | 14 +- .../server/events/ServiceUpdateEvent.java | 6 +- .../listeners/requests/StateUpdateListener.java | 6 +- .../services/ServiceUpdateListener.java | 29 +- .../listeners/tasks/TaskStatusListener.java | 33 +- .../publishers/AgentCommandsPublisher.java | 48 +- .../BufferedUpdateEventPublisher.java | 73 +++ .../HostComponentUpdateEventPublisher.java | 41 +- .../publishers/ServiceUpdateEventPublisher.java | 68 +++ .../publishers/StateUpdateEventPublisher.java | 11 +- .../orm/dao/HostComponentDesiredStateDAO.java | 34 +- .../dao/ServiceComponentDesiredStateDAO.java | 37 ++ .../HostComponentDesiredStateEntity.java | 8 +- .../orm/entities/HostComponentStateEntity.java | 18 + .../ambari/server/state/ConfigHelper.java | 40 +- .../org/apache/ambari/server/state/Host.java | 8 + .../server/state/ServiceComponentHost.java | 6 + .../state/ServiceComponentHostFactory.java | 4 + .../server/state/ServiceComponentImpl.java | 19 +- .../server/state/alert/AlertDefinitionHash.java | 3 +- .../server/state/cluster/ClusterImpl.java | 24 +- .../ambari/server/state/host/HostImpl.java | 115 +++- .../svccomphost/ServiceComponentHostImpl.java | 84 ++- .../server/topology/TopologyDeleteFormer.java | 7 +- .../server/upgrade/UpgradeCatalog300.java | 29 + .../resources/Ambari-DDL-AzureDB-CREATE.sql | 1 + .../main/resources/Ambari-DDL-Derby-CREATE.sql | 1 + .../main/resources/Ambari-DDL-MySQL-CREATE.sql | 1 + .../main/resources/Ambari-DDL-Oracle-CREATE.sql | 1 + .../resources/Ambari-DDL-Postgres-CREATE.sql | 1 + .../resources/Ambari-DDL-SQLAnywhere-CREATE.sql | 1 + .../resources/Ambari-DDL-SQLServer-CREATE.sql | 1 + .../actionmanager/TestActionScheduler.java | 142 +++-- .../server/agent/AgentSessionManagerTest.java | 25 +- .../listeners/tasks/TaskStatusListenerTest.java | 23 +- .../server/upgrade/UpgradeCatalog300Test.java | 11 + 83 files changed, 2066 insertions(+), 661 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/HostNotRegisteredException.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/HostNotRegisteredException.java b/ambari-server/src/main/java/org/apache/ambari/server/HostNotRegisteredException.java index eadd5f1..82d42cd 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/HostNotRegisteredException.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/HostNotRegisteredException.java @@ -27,8 +27,8 @@ public class HostNotRegisteredException extends AmbariException { return new HostNotRegisteredException(String.format("Host with sessionId '%s' not registered", sessionId)); } - public static HostNotRegisteredException forHostName(String hostName) { - return new HostNotRegisteredException(String.format("Host with hostName '%s' not registered", hostName)); + public static HostNotRegisteredException forHostId(Long hostId) { + return new HostNotRegisteredException(String.format("Host with hostId '%s' not registered", hostId)); } private HostNotRegisteredException(String message) { http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java index 595edcd..c41dd01 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java @@ -242,7 +242,7 @@ class ActionScheduler implements Runnable { UnitOfWork unitOfWork, AmbariEventPublisher ambariEventPublisher, Configuration configuration, Provider<EntityManager> entityManagerProvider, HostRoleCommandDAO hostRoleCommandDAO, HostRoleCommandFactory hostRoleCommandFactory, - RoleCommandOrderProvider roleCommandOrderProvider) { + RoleCommandOrderProvider roleCommandOrderProvider, AgentCommandsPublisher agentCommandsPublisher) { sleepTime = sleepTimeMilliSec; actionTimeout = actionTimeoutMilliSec; @@ -259,6 +259,7 @@ class ActionScheduler implements Runnable { this.hostRoleCommandFactory = hostRoleCommandFactory; jpaPublisher = null; this.roleCommandOrderProvider = roleCommandOrderProvider; + this.agentCommandsPublisher = agentCommandsPublisher; serverActionExecutor = new ServerActionExecutor(db, sleepTime); initializeCaches(); @@ -284,11 +285,12 @@ class ActionScheduler implements Runnable { ActionQueue actionQueue, Clusters fsmObject, int maxAttempts, HostsMap hostsMap, UnitOfWork unitOfWork, AmbariEventPublisher ambariEventPublisher, Configuration configuration, Provider<EntityManager> entityManagerProvider, - HostRoleCommandDAO hostRoleCommandDAO, HostRoleCommandFactory hostRoleCommandFactory) { + HostRoleCommandDAO hostRoleCommandDAO, HostRoleCommandFactory hostRoleCommandFactory, + AgentCommandsPublisher agentCommandsPublisher) { this(sleepTimeMilliSec, actionTimeoutMilliSec, db, actionQueue, fsmObject, maxAttempts, hostsMap, unitOfWork, ambariEventPublisher, configuration, entityManagerProvider, hostRoleCommandDAO, hostRoleCommandFactory, - null); + null, agentCommandsPublisher); } /** @@ -456,7 +458,7 @@ class ActionScheduler implements Runnable { // Commands that will be scheduled in current scheduler wakeup List<ExecutionCommand> commandsToSchedule = new ArrayList<>(); - Multimap<String, AgentCommand> commandsToEnqueue = ArrayListMultimap.create(); + Multimap<Long, AgentCommand> commandsToEnqueue = ArrayListMultimap.create(); Map<String, RoleStats> roleStats = processInProgressStage(stage, commandsToSchedule, commandsToEnqueue); @@ -559,7 +561,7 @@ class ActionScheduler implements Runnable { if (Role.AMBARI_SERVER_ACTION.name().equals(cmd.getRole())) { serverActionExecutor.awake(); } else { - commandsToEnqueue.put(cmd.getHostname(), cmd); + commandsToEnqueue.put(clusters.getHost(cmd.getHostname()).getHostId(), cmd); } } agentCommandsPublisher.sendAgentCommand(commandsToEnqueue); @@ -746,7 +748,7 @@ class ActionScheduler implements Runnable { * whether stage has succeeded or failed */ protected Map<String, RoleStats> processInProgressStage(Stage s, List<ExecutionCommand> commandsToSchedule, - Multimap<String, AgentCommand> commandsToEnqueue) throws AmbariException { + Multimap<Long, AgentCommand> commandsToEnqueue) throws AmbariException { LOG.debug("==> Collecting commands to schedule..."); // Map to track role status Map<String, RoleStats> roleStats = initRoleStats(s); @@ -1274,7 +1276,7 @@ class ActionScheduler implements Runnable { CancelCommand cancelCommand = new CancelCommand(); cancelCommand.setTargetTaskId(hostRoleCommand.getTaskId()); cancelCommand.setReason(reason); - agentCommandsPublisher.sendAgentCommand(hostRoleCommand.getHostName(), cancelCommand); + agentCommandsPublisher.sendAgentCommand(hostRoleCommand.getHostId(), cancelCommand); } } @@ -1294,7 +1296,7 @@ class ActionScheduler implements Runnable { } } - void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands, Multimap<String, AgentCommand> commandsToEnqueue) { + void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands, Multimap<Long, AgentCommand> commandsToEnqueue) { for (HostRoleCommand hostRoleCommand : hostRoleCommands) { // There are no server actions in actionQueue if (!Role.AMBARI_SERVER_ACTION.equals(hostRoleCommand.getRole())) { @@ -1303,7 +1305,7 @@ class ActionScheduler implements Runnable { CancelCommand cancelCommand = new CancelCommand(); cancelCommand.setTargetTaskId(hostRoleCommand.getTaskId()); cancelCommand.setReason("Stage timeout"); - commandsToEnqueue.put(hostRoleCommand.getHostName(), cancelCommand); + commandsToEnqueue.put(hostRoleCommand.getHostId(), cancelCommand); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java new file mode 100644 index 0000000..817a238 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.agent; + +import java.util.List; + +import org.apache.ambari.server.agent.stomp.dto.HostStatusReport; + +public class AgentReport { + + private String hostName; + private List<ComponentStatus> componentStatuses; + private List<CommandReport> reports; + private HostStatusReport hostStatusReport; + + public AgentReport(String hostName, List<ComponentStatus> componentStatuses, List<CommandReport> reports, HostStatusReport hostStatusReport) { + this.hostName = hostName; + this.componentStatuses = componentStatuses; + this.reports = reports; + this.hostStatusReport = hostStatusReport; + } + + public String getHostName() { + return hostName; + } + + public List<ComponentStatus> getComponentStatuses() { + return componentStatuses; + } + + public List<CommandReport> getCommandReports() { + return reports; + } + + public HostStatusReport getHostStatusReport() { + return hostStatusReport; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java new file mode 100644 index 0000000..586a16e --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.agent; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.google.inject.persist.UnitOfWork; + +@Singleton +public class AgentReportsProcessor { + private static final Logger LOG = LoggerFactory.getLogger(AgentReportsProcessor.class); + + private ScheduledExecutorService executor; + + private ConcurrentLinkedQueue<AgentReport> agentReportsQueue = new ConcurrentLinkedQueue<>(); + + public void addAgentReport(AgentReport agentReport) { + agentReportsQueue.add(agentReport); + } + + @Inject + private HeartBeatHandler hh; + + @Inject + private UnitOfWork unitOfWork; + + @Inject + private Configuration configuration; + + public AgentReportsProcessor() { + + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("agent-report-processor-%d").build(); + int poolSize = configuration.getAgentsReportThreadPoolSize(); + executor = Executors.newScheduledThreadPool(poolSize, threadFactory); + for (int i=0; i< poolSize; i++) { + executor.scheduleAtFixedRate(new AgentReportProcessingTask(), + configuration.getAgentsReportProcessingStartTimeout(), + configuration.getAgentsReportProcessingPeriod(), TimeUnit.SECONDS); + } + } + + private class AgentReportProcessingTask implements Runnable { + + @Override + public void run() { + try { + unitOfWork.begin(); + while (true) { + AgentReport agentReport = agentReportsQueue.poll(); + if (agentReport == null) { + break; + } + String hostName = agentReport.getHostName(); + try { + + //TODO rewrite with polymorphism usage. + if (agentReport.getCommandReports() != null) { + hh.handleCommandReportStatus(agentReport.getCommandReports(), hostName); + } else if (agentReport.getComponentStatuses() != null) { + hh.handleComponentReportStatus(agentReport.getComponentStatuses(), hostName); + } else if (agentReport.getHostStatusReport() != null) { + hh.handleHostReportStatus(agentReport.getHostStatusReport(), hostName); + } + } catch (AmbariException e) { + LOG.error("Error processing agent reports", e); + } + } + } finally { + unitOfWork.end(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java index 3040f55..2f435bb 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java @@ -30,14 +30,14 @@ import com.google.inject.Singleton; public class AgentSessionManager { private final ConcurrentMap<String, Host> registeredHosts = new ConcurrentHashMap<>(); // session ID -> host - private final ConcurrentMap<String, String> registeredSessionIds = new ConcurrentHashMap<>(); // hostname -> session ID + private final ConcurrentMap<Long, String> registeredSessionIds = new ConcurrentHashMap<>(); public void register(String sessionId, Host host) { Preconditions.checkNotNull(sessionId); Preconditions.checkNotNull(host); - Preconditions.checkNotNull(host.getHostName()); + Preconditions.checkNotNull(host.getHostId()); - String oldSessionId = registeredSessionIds.put(host.getHostName(), sessionId); + String oldSessionId = registeredSessionIds.put(host.getHostId(), sessionId); if (oldSessionId != null) { registeredHosts.remove(oldSessionId); } @@ -59,21 +59,21 @@ public class AgentSessionManager { throw HostNotRegisteredException.forSessionId(sessionId); } - public String getSessionId(String hostName) throws HostNotRegisteredException { - Preconditions.checkNotNull(hostName); + public String getSessionId(Long hostId) throws HostNotRegisteredException { + Preconditions.checkNotNull(hostId); - String sessionId = registeredSessionIds.get(hostName); + String sessionId = registeredSessionIds.get(hostId); if (sessionId != null) { return sessionId; } - throw HostNotRegisteredException.forHostName(hostName); + throw HostNotRegisteredException.forHostId(hostId); } - public void unregisterByHost(String hostName) { - Preconditions.checkNotNull(hostName); + public void unregisterByHost(Long hostId) { + Preconditions.checkNotNull(hostId); - String sessionId = registeredSessionIds.remove(hostName); + String sessionId = registeredSessionIds.remove(hostId); if (sessionId != null) { registeredHosts.remove(sessionId); } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java index 00d469f..ee6e05c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java @@ -17,11 +17,9 @@ */ package org.apache.ambari.server.agent; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; @@ -42,7 +40,6 @@ import org.apache.ambari.server.state.HostState; import org.apache.ambari.server.state.ServiceComponent; import org.apache.ambari.server.state.ServiceComponentHost; import org.apache.ambari.server.state.StackId; -import org.apache.ambari.server.state.alert.AlertDefinition; import org.apache.ambari.server.state.alert.AlertDefinitionHash; import org.apache.ambari.server.state.fsm.InvalidStateTransitionException; import org.apache.ambari.server.state.host.HostHealthyHeartbeatEvent; @@ -190,6 +187,15 @@ public class HeartBeatHandler { hostResponseIds.put(hostname, currentResponseId); hostResponses.put(hostname, response); + // If the host is waiting for component status updates, notify it + if (hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) { + try { + LOG.debug("Got component status updates for host {}", hostname); + hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname, now)); + } catch (InvalidStateTransitionException e) { + LOG.warn("Failed to notify the host {} about component status updates", hostname, e); + } + } if (heartbeat.getRecoveryReport() != null) { RecoveryReport rr = heartbeat.getRecoveryReport(); processRecoveryReport(rr, hostname); @@ -204,27 +210,6 @@ public class HeartBeatHandler { return createRegisterCommand(); } - /* - * A host can belong to only one cluster. Though getClustersForHost(hostname) - * returns a set of clusters, it will have only one entry. - * - * TODO: Handle the case when a host is a part of multiple clusters. - */ - Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname); - - if (clusters.size() > 0) { - String clusterName = clusters.iterator().next().getClusterName(); - - if (recoveryConfigHelper.isConfigStale(clusterName, hostname, heartbeat.getRecoveryTimestamp())) { - RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname); - response.setRecoveryConfig(rc); - - if (response.getRecoveryConfig() != null) { - LOG.debug("Recovery configuration set to {}", response.getRecoveryConfig().toString()); - } - } - } - heartbeatProcessor.addHeartbeat(heartbeat); // Send commands if node is active @@ -253,7 +238,7 @@ public class HeartBeatHandler { } catch (InvalidStateTransitionException ex) { LOG.warn("Asking agent to re-register due to " + ex.getMessage(), ex); host.setState(HostState.INIT); - agentSessionManager.unregisterByHost(hostname); + agentSessionManager.unregisterByHost(host.getHostId()); } } @@ -338,19 +323,11 @@ public class HeartBeatHandler { } // Resetting host state - hostObject.setState(HostState.INIT); + hostObject.setStateMachineState(HostState.INIT); // Set ping port for agent hostObject.setCurrentPingPort(currentPingPort); - // Get status of service components - List<StatusCommand> cmds = heartbeatMonitor.generateStatusCommands(hostname); - - // Add request for component version - for (StatusCommand command: cmds) { - command.getCommandParams().put("request_version", String.valueOf(true)); - } - // Save the prefix of the log file paths hostObject.setPrefix(register.getPrefix()); @@ -360,43 +337,6 @@ public class HeartBeatHandler { register.getAgentEnv())); RegistrationResponse response = new RegistrationResponse(); - if (cmds.isEmpty()) { - //No status commands needed let the fsm know that status step is done - hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname, - now)); - } - - response.setStatusCommands(cmds); - - response.setResponseStatus(RegistrationStatus.OK); - - // force the registering agent host to receive its list of alert definitions - List<AlertDefinitionCommand> alertDefinitionCommands = getRegistrationAlertDefinitionCommands(hostname); - response.setAlertDefinitionCommands(alertDefinitionCommands); - - response.setAgentConfig(config.getAgentConfigsMap()); - if(response.getAgentConfig() != null) { - LOG.debug("Agent configuration map set to {}", response.getAgentConfig()); - } - - /* - * A host can belong to only one cluster. Though getClustersForHost(hostname) - * returns a set of clusters, it will have only one entry. - * - * TODO: Handle the case when a host is a part of multiple clusters. - */ - Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname); - - if (clusters.size() > 0) { - String clusterName = clusters.iterator().next().getClusterName(); - - RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname); - response.setRecoveryConfig(rc); - - if(response.getRecoveryConfig() != null) { - LOG.info("Recovery configuration set to " + response.getRecoveryConfig()); - } - } Long requestId = 0L; hostResponseIds.put(hostname, requestId); @@ -463,45 +403,6 @@ public class HeartBeatHandler { return response; } - /** - * Gets the {@link AlertDefinitionCommand} instances that need to be sent for - * each cluster that the registering host is a member of. - * - * @param hostname - * @return - * @throws AmbariException - */ - private List<AlertDefinitionCommand> getRegistrationAlertDefinitionCommands( - String hostname) throws AmbariException { - - Set<Cluster> hostClusters = clusterFsm.getClustersForHost(hostname); - if (null == hostClusters || hostClusters.size() == 0) { - return null; - } - - List<AlertDefinitionCommand> commands = new ArrayList<>(); - - // for every cluster this host is a member of, build the command - for (Cluster cluster : hostClusters) { - String clusterName = cluster.getClusterName(); - alertDefinitionHash.invalidate(clusterName, hostname); - - List<AlertDefinition> definitions = alertDefinitionHash.getAlertDefinitions( - clusterName, hostname); - - String hash = alertDefinitionHash.getHash(clusterName, hostname); - Host host = cluster.getHost(hostname); - String publicHostName = host == null? hostname : host.getPublicHostName(); - AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName, - hostname, publicHostName, hash, definitions); - - command.addConfigs(configHelper, cluster); - commands.add(command); - } - - return commands; - } - public void stop() { heartbeatMonitor.shutdown(); heartbeatProcessor.stopAsync(); http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java index 29db219..c5caf85 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java @@ -171,7 +171,9 @@ public class HeartbeatMonitor implements Runnable { !sch.getState().equals(State.UNINSTALLED) && !sch.getState().equals(State.DISABLED)) { LOG.warn("Setting component state to UNKNOWN for component " + sc.getName() + " on " + host); + State oldState = sch.getState(); sch.setState(State.UNKNOWN); + sch.setLastValidState(oldState); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java index ef9b0f2..466b24c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java @@ -56,15 +56,12 @@ import org.apache.ambari.server.orm.dao.KerberosPrincipalHostDAO; import org.apache.ambari.server.state.Alert; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; -import org.apache.ambari.server.state.ComponentInfo; import org.apache.ambari.server.state.Host; import org.apache.ambari.server.state.HostHealthStatus; import org.apache.ambari.server.state.HostState; -import org.apache.ambari.server.state.MaintenanceState; import org.apache.ambari.server.state.Service; import org.apache.ambari.server.state.ServiceComponent; import org.apache.ambari.server.state.ServiceComponentHost; -import org.apache.ambari.server.state.StackId; import org.apache.ambari.server.state.UpgradeState; import org.apache.ambari.server.state.fsm.InvalidStateTransitionException; import org.apache.ambari.server.state.host.HostStatusUpdatesReceivedEvent; @@ -297,51 +294,7 @@ public class HeartbeatProcessor extends AbstractService{ } if (calculateHostStatus) { - //Use actual component status to compute the host status - int masterCount = 0; - int mastersRunning = 0; - int slaveCount = 0; - int slavesRunning = 0; - - Cluster cluster = clusterFsm.getCluster(clusterId); - - List<ServiceComponentHost> scHosts = cluster.getServiceComponentHosts(hostName); - for (ServiceComponentHost scHost : scHosts) { - StackId stackId = scHost.getDesiredStackId(); - - ComponentInfo componentInfo = - ambariMetaInfo.getComponent(stackId.getStackName(), - stackId.getStackVersion(), scHost.getServiceName(), - scHost.getServiceComponentName()); - - String status = scHost.getState().name(); - - String category = componentInfo.getCategory(); - - if (MaintenanceState.OFF == maintenanceStateHelper.getEffectiveState(scHost, host)) { - if (category.equals("MASTER")) { - ++masterCount; - if (status.equals("STARTED")) { - ++mastersRunning; - } - } else if (category.equals("SLAVE")) { - ++slaveCount; - if (status.equals("STARTED")) { - ++slavesRunning; - } - } - } - } - - if (masterCount == mastersRunning && slaveCount == slavesRunning) { - healthStatus = HostHealthStatus.HealthStatus.HEALTHY; - } else if (masterCount > 0 && mastersRunning < masterCount) { - healthStatus = HostHealthStatus.HealthStatus.UNHEALTHY; - } else { - healthStatus = HostHealthStatus.HealthStatus.ALERT; - } - - host.setStatus(healthStatus.name()); + host.calculateHostStatus(clusterId); } //If host doesn't belong to any cluster http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java index 11f299c..f966386 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,6 +19,8 @@ package org.apache.ambari.server.agent.stomp; import java.util.Objects; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.inject.Inject; @@ -38,9 +40,17 @@ public abstract class AgentClusterDataHolder<T extends AmbariUpdateEvent & Hasha private T data; + //TODO perhaps need optimization + private Lock lock = new ReentrantLock(); + public T getUpdateIfChanged(String agentHash) throws AmbariException { - initializeDataIfNeeded(true); - return !Objects.equals(agentHash, data.getHash()) ? data : getEmptyData(); + try { + lock.lock(); + initializeDataIfNeeded(true); + return !Objects.equals(agentHash, data.getHash()) ? data : getEmptyData(); + } finally { + lock.unlock(); + } } /** @@ -71,7 +81,12 @@ public abstract class AgentClusterDataHolder<T extends AmbariUpdateEvent & Hasha } protected final void regenerateHash() { - regenerateHash(data); + try { + lock.lock(); + regenerateHash(data); + } finally { + lock.unlock(); + } } protected final void initializeDataIfNeeded(boolean regenerateHash) throws AmbariException { http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java index 54d8c23..50779ff 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java @@ -40,28 +40,28 @@ public class AgentConfigsHolder extends AgentHostDataHolder<AgentConfigsUpdateEv private Provider<Clusters> clusters; @Override - public AgentConfigsUpdateEvent getCurrentData(String hostName) throws AmbariException { - return configHelper.getHostActualConfigs(hostName); + public AgentConfigsUpdateEvent getCurrentData(Long hostId) throws AmbariException { + return configHelper.getHostActualConfigs(hostId); } protected boolean handleUpdate(AgentConfigsUpdateEvent update) throws AmbariException { - setData(update, update.getHostName()); + setData(update, update.getHostId()); return true; } - public void updateData(Long clusterId, List<String> hostNames) throws AmbariException { - if (CollectionUtils.isEmpty(hostNames)) { + public void updateData(Long clusterId, List<Long> hostIds) throws AmbariException { + if (CollectionUtils.isEmpty(hostIds)) { // TODO cluster configs will be created before hosts assigning if (CollectionUtils.isEmpty(clusters.get().getCluster(clusterId).getHosts())) { - hostNames = clusters.get().getHosts().stream().map(Host::getHostName).collect(Collectors.toList()); + hostIds = clusters.get().getHosts().stream().map(Host::getHostId).collect(Collectors.toList()); } else { - hostNames = clusters.get().getCluster(clusterId).getHosts().stream().map(Host::getHostName).collect(Collectors.toList()); + hostIds = clusters.get().getCluster(clusterId).getHosts().stream().map(Host::getHostId).collect(Collectors.toList()); } } - for (String hostName : hostNames) { - AgentConfigsUpdateEvent agentConfigsUpdateEvent = configHelper.getHostActualConfigs(hostName); - agentConfigsUpdateEvent.setHostName(hostName); + for (Long hostId : hostIds) { + AgentConfigsUpdateEvent agentConfigsUpdateEvent = configHelper.getHostActualConfigs(hostId); + agentConfigsUpdateEvent.setHostId(hostId); updateData(agentConfigsUpdateEvent); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java index 5ea5f06..0a46ce1 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java @@ -29,7 +29,6 @@ import org.apache.ambari.server.state.fsm.InvalidStateTransitionException; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.simp.annotation.SendToUser; -import org.springframework.messaging.simp.annotation.SubscribeMapping; import org.springframework.stereotype.Controller; import com.google.inject.Injector; @@ -55,30 +54,30 @@ public class AgentCurrentDataController { alertDefinitionsHolder = injector.getInstance(AlertDefinitionsHolder.class); } - @SubscribeMapping("/topologies") + @MessageMapping("/topologies") public TopologyUpdateEvent getCurrentTopology(Hash hash) throws AmbariException, InvalidStateTransitionException { return topologyHolder.getUpdateIfChanged(hash.getHash()); } - @SubscribeMapping("/metadata") + @MessageMapping("/metadata") public MetadataUpdateEvent getCurrentMetadata(Hash hash) throws AmbariException { return metadataHolder.getUpdateIfChanged(hash.getHash()); } - @SubscribeMapping("/alert_definitions") + @MessageMapping("/alert_definitions") public AlertDefinitionsUpdateEvent getAlertDefinitions(@Header String simpSessionId, Hash hash) throws AmbariException { - String hostName = agentSessionManager.getHost(simpSessionId).getHostName(); - return alertDefinitionsHolder.getUpdateIfChanged(hash.getHash(), hostName); + Long hostId = agentSessionManager.getHost(simpSessionId).getHostId(); + return alertDefinitionsHolder.getUpdateIfChanged(hash.getHash(), hostId); } - @SubscribeMapping("/configs") + @MessageMapping("/configs") public AgentConfigsUpdateEvent getCurrentConfigs(@Header String simpSessionId, Hash hash) throws AmbariException { - return agentConfigsHolder.getUpdateIfChanged(hash.getHash(), agentSessionManager.getHost(simpSessionId).getHostName()); + return agentConfigsHolder.getUpdateIfChanged(hash.getHash(), agentSessionManager.getHost(simpSessionId).getHostId()); } - @SubscribeMapping("/host_level_params") + @MessageMapping("/host_level_params") public HostLevelParamsUpdateEvent getCurrentHostLevelParams(@Header String simpSessionId, Hash hash) throws AmbariException { - return hostLevelParamsHolder.getUpdateIfChanged(hash.getHash(), agentSessionManager.getHost(simpSessionId).getHostName()); + return hostLevelParamsHolder.getUpdateIfChanged(hash.getHash(), agentSessionManager.getHost(simpSessionId).getHostId()); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java index 746b755..7c540f9 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java @@ -38,24 +38,24 @@ public abstract class AgentHostDataHolder<T extends AmbariHostUpdateEvent & Hash @Inject private StateUpdateEventPublisher stateUpdateEventPublisher; - private final Map<String, T> data = new ConcurrentHashMap<>(); + private final Map<Long, T> data = new ConcurrentHashMap<>(); - protected abstract T getCurrentData(String hostName) throws AmbariException; + protected abstract T getCurrentData(Long hostId) throws AmbariException; protected abstract boolean handleUpdate(T update) throws AmbariException; - public T getUpdateIfChanged(String agentHash, String hostName) throws AmbariException { - T hostData = initializeDataIfNeeded(hostName, true); + public T getUpdateIfChanged(String agentHash, Long hostId) throws AmbariException { + T hostData = initializeDataIfNeeded(hostId, true); return !Objects.equals(agentHash, hostData.getHash()) ? hostData : getEmptyData(); } - private T initializeDataIfNeeded(String hostName, boolean regenerateHash) throws AmbariException { - T hostData = data.get(hostName); + private T initializeDataIfNeeded(Long hostId, boolean regenerateHash) throws AmbariException { + T hostData = data.get(hostId); if (hostData == null) { - hostData = getCurrentData(hostName); + hostData = getCurrentData(hostId); if (regenerateHash) { regenerateHash(hostData); } - data.put(hostName, hostData); + data.put(hostId, hostData); } return hostData; } @@ -65,9 +65,9 @@ public abstract class AgentHostDataHolder<T extends AmbariHostUpdateEvent & Hash * event to listeners. */ public final void updateData(T update) throws AmbariException { - initializeDataIfNeeded(update.getHostName(), false); + initializeDataIfNeeded(update.getHostId(), false); if (handleUpdate(update)) { - T hostData = getData(update.getHostName()); + T hostData = getData(update.getHostId()); regenerateHash(hostData); update.setHash(hostData.getHash()); stateUpdateEventPublisher.publish(update); @@ -77,28 +77,28 @@ public abstract class AgentHostDataHolder<T extends AmbariHostUpdateEvent & Hash /** * Reset data for the given host. Used if changes are complex and it's easier to re-create data from scratch. */ - public final void resetData(String hostName) throws AmbariException { - T newData = getCurrentData(hostName); - data.replace(hostName, newData); + public final void resetData(Long hostId) throws AmbariException { + T newData = getCurrentData(hostId); + data.replace(hostId, newData); stateUpdateEventPublisher.publish(newData); } /** * Remove data for the given host. */ - public final void onHostRemoved(String hostName) { - data.remove(hostName); + public final void onHostRemoved(String hostId) { + data.remove(hostId); } - public Map<String, T> getData() { + public Map<Long, T> getData() { return data; } - public T getData(String hostName) { - return data.get(hostName); + public T getData(Long hostId) { + return data.get(hostId); } - public void setData(T data, String hostName) { - this.data.put(hostName, data); + public void setData(T data, Long hostId) { + this.data.put(hostId, data); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java index 5599254..ccfbc75 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java @@ -25,6 +25,8 @@ import java.util.Map; import javax.ws.rs.WebApplicationException; import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.agent.AgentReport; +import org.apache.ambari.server.agent.AgentReportsProcessor; import org.apache.ambari.server.agent.AgentSessionManager; import org.apache.ambari.server.agent.CommandReport; import org.apache.ambari.server.agent.ComponentStatus; @@ -40,7 +42,6 @@ import org.slf4j.LoggerFactory; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.simp.annotation.SendToUser; -import org.springframework.messaging.simp.annotation.SubscribeMapping; import org.springframework.stereotype.Controller; import com.google.inject.Injector; @@ -52,13 +53,15 @@ public class AgentReportsController { private static final Logger LOG = LoggerFactory.getLogger(AgentReportsController.class); private final HeartBeatHandler hh; private final AgentSessionManager agentSessionManager; + private final AgentReportsProcessor agentReportsProcessor; public AgentReportsController(Injector injector) { hh = injector.getInstance(HeartBeatHandler.class); agentSessionManager = injector.getInstance(AgentSessionManager.class); + agentReportsProcessor = injector.getInstance(AgentReportsProcessor.class); } - @SubscribeMapping("/component_status") + @MessageMapping("/component_status") public void handleComponentReportStatus(@Header String simpSessionId, ComponentStatusReports message) throws WebApplicationException, InvalidStateTransitionException, AmbariException { List<ComponentStatus> statuses = new ArrayList<>(); @@ -73,11 +76,11 @@ public class AgentReportsController { } } - hh.handleComponentReportStatus(statuses, - agentSessionManager.getHost(simpSessionId).getHostName()); + agentReportsProcessor.addAgentReport(new AgentReport(agentSessionManager.getHost(simpSessionId).getHostName(), + statuses, null, null)); } - @SubscribeMapping("/commands_status") + @MessageMapping("/commands_status") public void handleCommandReportStatus(@Header String simpSessionId, CommandStatusReports message) throws WebApplicationException, InvalidStateTransitionException, AmbariException { List<CommandReport> statuses = new ArrayList<>(); @@ -85,16 +88,17 @@ public class AgentReportsController { statuses.addAll(clusterReport.getValue()); } - hh.handleCommandReportStatus(statuses, - agentSessionManager.getHost(simpSessionId).getHostName()); + agentReportsProcessor.addAgentReport(new AgentReport(agentSessionManager.getHost(simpSessionId).getHostName(), + null, statuses, null)); } - @SubscribeMapping("/host_status") + @MessageMapping("/host_status") public void handleHostReportStatus(@Header String simpSessionId, HostStatusReport message) throws AmbariException { - hh.handleHostReportStatus(message, agentSessionManager.getHost(simpSessionId).getHostName()); + agentReportsProcessor.addAgentReport(new AgentReport(agentSessionManager.getHost(simpSessionId).getHostName(), + null, null, message)); } - @SubscribeMapping("/alerts_status") + @MessageMapping("/alerts_status") public void handleAlertsStatus(@Header String simpSessionId, Alert[] message) throws AmbariException { String hostName = agentSessionManager.getHost(simpSessionId).getHostName(); List<Alert> alerts = Arrays.asList(message); http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentsRegistrationQueue.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentsRegistrationQueue.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentsRegistrationQueue.java new file mode 100644 index 0000000..17518ad --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentsRegistrationQueue.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.agent.stomp; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.ambari.server.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.inject.Injector; + +/** + * Simultaneous processing a lot of registering/topology/metadata etc. requests from agents during + * agent registration can cause response timeout on agents' side. So it is allowed to process simultaneously requests + * only from limited number of agents with session ids from {@link registrationQueue}. Queue has limited capacity, + * session id can able be appeared in queue with agent connecting to server and releases with first heartbeat or disconnect from + * server. + */ +public class AgentsRegistrationQueue { + private static final Logger LOG = LoggerFactory.getLogger(AgentsRegistrationQueue.class); + private final BlockingQueue<String> registrationQueue; + private final ThreadFactory threadFactoryExecutor = new ThreadFactoryBuilder().setNameFormat("agents-queue-%d").build(); + private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1, threadFactoryExecutor); + + public AgentsRegistrationQueue(Injector injector) { + Configuration configuration = injector.getInstance(Configuration.class); + registrationQueue = new ArrayBlockingQueue<>(configuration.getAgentsRegistrationQueueSize()); + } + + public boolean offer(String sessionId) { + boolean offered = registrationQueue.offer(sessionId); + scheduledExecutorService.schedule(new CompleteJob(sessionId, registrationQueue), 60, TimeUnit.SECONDS); + return offered; + } + + public void complete(String sessionId) { + registrationQueue.remove(sessionId); + } + + private class CompleteJob implements Runnable { + private String sessionId; + private BlockingQueue<String> registrationQueue; + + public CompleteJob(String sessionId, BlockingQueue<String> registrationQueue) { + this.sessionId = sessionId; + this.registrationQueue = registrationQueue; + } + + @Override + public void run() { + registrationQueue.remove(sessionId); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AlertDefinitionsHolder.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AlertDefinitionsHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AlertDefinitionsHolder.java index 6c6bdd4..9c3f9b5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AlertDefinitionsHolder.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AlertDefinitionsHolder.java @@ -67,9 +67,10 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions } @Override - protected AlertDefinitionsUpdateEvent getCurrentData(String hostName) throws AmbariException { + protected AlertDefinitionsUpdateEvent getCurrentData(Long hostId) throws AmbariException { Map<Long, AlertCluster> result = new TreeMap<>(); - Map<Long, Map<Long, AlertDefinition>> alertDefinitions = helper.get().getAlertDefinitions(hostName); + Map<Long, Map<Long, AlertDefinition>> alertDefinitions = helper.get().getAlertDefinitions(hostId); + String hostName = clusters.get().getHostById(hostId).getHostName(); long count = 0; for (Map.Entry<Long, Map<Long, AlertDefinition>> e : alertDefinitions.entrySet()) { Long clusterId = e.getKey(); @@ -78,7 +79,7 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions count += definitionMap.size(); } LOG.info("Loaded {} alert definitions for {} clusters for host {}", count, result.size(), hostName); - return new AlertDefinitionsUpdateEvent(CREATE, result, hostName); + return new AlertDefinitionsUpdateEvent(CREATE, result, hostName, hostId); } @Override @@ -93,9 +94,9 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions return false; } - String hostName = update.getHostName(); + Long hostId = update.getHostId(); boolean changed = false; - Map<Long, AlertCluster> existingClusters = getData(hostName).getClusters(); + Map<Long, AlertCluster> existingClusters = getData(hostId).getClusters(); switch (update.getEventType()) { case UPDATE: @@ -106,7 +107,7 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions for (Map.Entry<Long, AlertCluster> e : updateClusters.entrySet()) { changed |= existingClusters.get(e.getKey()).handleUpdate(update.getEventType(), e.getValue()); } - LOG.debug("Handled {} of alerts for {} cluster(s) on host {}, changed = {}", update.getEventType(), updateClusters.size(), hostName, changed); + LOG.debug("Handled {} of alerts for {} cluster(s) on host with id {}, changed = {}", update.getEventType(), updateClusters.size(), hostId, changed); break; case CREATE: if (!updateClusters.isEmpty()) { @@ -127,25 +128,26 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions } @Subscribe - public void onAlertDefinitionRegistered(AlertDefinitionRegistrationEvent event) { + public void onAlertDefinitionRegistered(AlertDefinitionRegistrationEvent event) throws AmbariException { handleSingleDefinitionChange(UPDATE, event.getDefinition()); } @Subscribe - public void onAlertDefinitionChanged(AlertDefinitionChangedEvent event) { + public void onAlertDefinitionChanged(AlertDefinitionChangedEvent event) throws AmbariException { handleSingleDefinitionChange(UPDATE, event.getDefinition()); } @Subscribe - public void onAlertDefinitionDeleted(AlertDefinitionDeleteEvent event) { + public void onAlertDefinitionDeleted(AlertDefinitionDeleteEvent event) throws AmbariException { handleSingleDefinitionChange(DELETE, event.getDefinition()); } @Subscribe - public void onServiceComponentInstalled(ServiceComponentInstalledEvent event) { + public void onServiceComponentInstalled(ServiceComponentInstalledEvent event) throws AmbariException { String hostName = event.getHostName(); String serviceName = event.getServiceName(); String componentName = event.getComponentName(); + Long hostId = clusters.get().getHost(hostName).getHostId(); Map<Long, AlertDefinition> definitions = helper.get().findByServiceComponent(event.getClusterId(), serviceName, componentName); @@ -162,18 +164,19 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions } Map<Long, AlertCluster> map = Collections.singletonMap(event.getClusterId(), new AlertCluster(definitions, hostName)); - safelyUpdateData(new AlertDefinitionsUpdateEvent(UPDATE, map, hostName)); + safelyUpdateData(new AlertDefinitionsUpdateEvent(UPDATE, map, hostName, hostId)); } @Subscribe - public void onServiceComponentUninstalled(ServiceComponentUninstalledEvent event) { + public void onServiceComponentUninstalled(ServiceComponentUninstalledEvent event) throws AmbariException { String hostName = event.getHostName(); + Long hostId = clusters.get().getHost(hostName).getHostId(); Map<Long, AlertDefinition> definitions = helper.get().findByServiceComponent(event.getClusterId(), event.getServiceName(), event.getComponentName()); if (event.isMasterComponent()) { definitions.putAll(helper.get().findByServiceMaster(event.getClusterId(), event.getServiceName())); } Map<Long, AlertCluster> map = Collections.singletonMap(event.getClusterId(), new AlertCluster(definitions, hostName)); - safelyUpdateData(new AlertDefinitionsUpdateEvent(DELETE, map, hostName)); + safelyUpdateData(new AlertDefinitionsUpdateEvent(DELETE, map, hostName, hostId)); } @Subscribe @@ -191,20 +194,21 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions } } - private void safelyResetData(String hostName) { + private void safelyResetData(Long hostId) { try { - resetData(hostName); + resetData(hostId); } catch (AmbariException e) { - LOG.warn(String.format("Failed to reset alert definitions for host %s", hostName), e); + LOG.warn(String.format("Failed to reset alert definitions for host with id %s", hostId), e); } } - private void handleSingleDefinitionChange(AlertDefinitionsUpdateEvent.EventType eventType, AlertDefinition alertDefinition) { + private void handleSingleDefinitionChange(AlertDefinitionsUpdateEvent.EventType eventType, AlertDefinition alertDefinition) throws AmbariException { LOG.info("{} alert definition '{}'", eventType, alertDefinition); Set<String> hosts = helper.get().invalidateHosts(alertDefinition); for (String hostName : hosts) { + Long hostId = clusters.get().getHost(hostName).getHostId(); Map<Long, AlertCluster> update = Collections.singletonMap(alertDefinition.getClusterId(), new AlertCluster(alertDefinition, hostName)); - AlertDefinitionsUpdateEvent event = new AlertDefinitionsUpdateEvent(eventType, update, hostName); + AlertDefinitionsUpdateEvent event = new AlertDefinitionsUpdateEvent(eventType, update, hostName, hostId); safelyUpdateData(event); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java new file mode 100644 index 0000000..aaab7bf --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java @@ -0,0 +1,536 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.agent.stomp; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.expression.AccessException; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.Expression; +import org.springframework.expression.ExpressionParser; +import org.springframework.expression.PropertyAccessor; +import org.springframework.expression.TypedValue; +import org.springframework.expression.spel.SpelEvaluationException; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.expression.spel.support.StandardEvaluationContext; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.simp.SimpMessageHeaderAccessor; +import org.springframework.messaging.simp.broker.AbstractSubscriptionRegistry; +import org.springframework.messaging.simp.broker.SubscriptionRegistry; +import org.springframework.messaging.support.MessageHeaderAccessor; +import org.springframework.util.AntPathMatcher; +import org.springframework.util.Assert; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.util.PathMatcher; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +/** + * Implementation of {@link SubscriptionRegistry} that has configurable cache size, optimized working with cache and + * destinations matching. + */ +public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry { + private static final Logger LOG = LoggerFactory.getLogger(AmbariSubscriptionRegistry.class); + + private PathMatcher pathMatcher = new AntPathMatcher(); + + private volatile int cacheLimit; + + private String selectorHeaderName = "selector"; + + private volatile boolean selectorHeaderInUse = false; + + private final ExpressionParser expressionParser = new SpelExpressionParser(); + + private final DestinationCache destinationCache; + + private final SessionSubscriptionRegistry subscriptionRegistry = new SessionSubscriptionRegistry(); + + public AmbariSubscriptionRegistry(int cacheLimit) { + this.cacheLimit = cacheLimit; + destinationCache = new DestinationCache(); + } + + /** + * Specify the {@link PathMatcher} to use. + */ + public void setPathMatcher(PathMatcher pathMatcher) { + this.pathMatcher = pathMatcher; + } + + /** + * Return the configured {@link PathMatcher}. + */ + public PathMatcher getPathMatcher() { + return this.pathMatcher; + } + + /** + * Specify the maximum number of entries for the resolved destination cache. + * Default is 1024. + */ + public void setCacheLimit(int cacheLimit) { + this.cacheLimit = cacheLimit; + } + + /** + * Return the maximum number of entries for the resolved destination cache. + */ + public int getCacheLimit() { + return this.cacheLimit; + } + + /** + * Configure the name of a selector header that a subscription message can + * have in order to filter messages based on their headers. The value of the + * header can use Spring EL expressions against message headers. + * <p>For example the following expression expects a header called "foo" to + * have the value "bar": + * <pre> + * headers.foo == 'bar' + * </pre> + * <p>By default this is set to "selector". + * @since 4.2 + */ + public void setSelectorHeaderName(String selectorHeaderName) { + Assert.notNull(selectorHeaderName, "'selectorHeaderName' must not be null"); + this.selectorHeaderName = selectorHeaderName; + } + + /** + * Return the name for the selector header. + * @since 4.2 + */ + public String getSelectorHeaderName() { + return this.selectorHeaderName; + } + + + @Override + protected void addSubscriptionInternal( + String sessionId, String subsId, String destination, Message<?> message) { + + Expression expression = null; + MessageHeaders headers = message.getHeaders(); + String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers); + if (selector != null) { + try { + expression = this.expressionParser.parseExpression(selector); + this.selectorHeaderInUse = true; + if (logger.isTraceEnabled()) { + logger.trace("Subscription selector: [" + selector + "]"); + } + } + catch (Throwable ex) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to parse selector: " + selector, ex); + } + } + } + this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression); + this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId); + } + + @Override + protected void removeSubscriptionInternal(String sessionId, String subsId, Message<?> message) { + SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId); + if (info != null) { + String destination = info.removeSubscription(subsId); + if (destination != null) { + this.destinationCache.updateAfterRemovedSubscription(sessionId, subsId); + } + } + } + + @Override + public void unregisterAllSubscriptions(String sessionId) { + SessionSubscriptionInfo info = this.subscriptionRegistry.removeSubscriptions(sessionId); + if (info != null) { + this.destinationCache.updateAfterRemovedSession(info); + } + } + + @Override + protected MultiValueMap<String, String> findSubscriptionsInternal(String destination, Message<?> message) { + MultiValueMap<String, String> result = this.destinationCache.getSubscriptions(destination, message); + return filterSubscriptions(result, message); + } + + private MultiValueMap<String, String> filterSubscriptions( + MultiValueMap<String, String> allMatches, Message<?> message) { + + if (!this.selectorHeaderInUse) { + return allMatches; + } + EvaluationContext context = null; + MultiValueMap<String, String> result = new LinkedMultiValueMap<String, String>(allMatches.size()); + for (String sessionId : allMatches.keySet()) { + for (String subId : allMatches.get(sessionId)) { + SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId); + if (info == null) { + continue; + } + Subscription sub = info.getSubscription(subId); + if (sub == null) { + continue; + } + Expression expression = sub.getSelectorExpression(); + if (expression == null) { + result.add(sessionId, subId); + continue; + } + if (context == null) { + context = new StandardEvaluationContext(message); + context.getPropertyAccessors().add(new SimpMessageHeaderPropertyAccessor()); + } + try { + if (expression.getValue(context, boolean.class)) { + result.add(sessionId, subId); + } + } + catch (SpelEvaluationException ex) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to evaluate selector: " + ex.getMessage()); + } + } + catch (Throwable ex) { + logger.debug("Failed to evaluate selector", ex); + } + } + } + return result; + } + + @Override + public String toString() { + return "DefaultSubscriptionRegistry[" + this.destinationCache + ", " + this.subscriptionRegistry + "]"; + } + + + /** + * A cache for destinations previously resolved via + * {@link org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry#findSubscriptionsInternal(String, Message)} + */ + private class DestinationCache { + + /** Map from destination -> <sessionId, subscriptionId> for fast look-ups */ + private final Map<String, LinkedMultiValueMap<String, String>> accessCache = + new ConcurrentHashMap<>(cacheLimit); + + private final Cache<String, String> notSubscriptionCache = + CacheBuilder.newBuilder().maximumSize(cacheLimit).build(); + + public LinkedMultiValueMap<String, String> getSubscriptions(String destination, Message<?> message) { + if (notSubscriptionCache.asMap().keySet().contains(destination)) { + return new LinkedMultiValueMap<>(); + } + LinkedMultiValueMap<String, String> subscriptions = this.accessCache.computeIfAbsent(destination, (key) -> { + LinkedMultiValueMap<String, String> result = new LinkedMultiValueMap<>(); + for (SessionSubscriptionInfo info : subscriptionRegistry.getAllSubscriptions()) { + for (String destinationPattern : info.getDestinations()) { + //TODO temporary changed to more fast-acting check without regex, need move investigation + if (destinationPattern.equals(destination)) { + for (Subscription subscription : info.getSubscriptions(destinationPattern)) { + result.add(info.sessionId, subscription.getId()); + } + } + } + } + if (!result.isEmpty()) { + return result; + } else { + notSubscriptionCache.put(destination, ""); + return null; + } + }); + return subscriptions == null ? new LinkedMultiValueMap<>() : subscriptions; + } + + public void updateAfterNewSubscription(String destination, String sessionId, String subsId) { + this.accessCache.computeIfPresent(destination, (key, value) -> { + if (getPathMatcher().match(destination, key)) { + LinkedMultiValueMap<String, String> subs = value.deepCopy(); + subs.add(sessionId, subsId); + return subs; + } + return value; + }); + } + + public void updateAfterRemovedSubscription(String sessionId, String subsId) { + for (Iterator<Map.Entry<String, LinkedMultiValueMap<String, String>>> iterator = + this.accessCache.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry<String, LinkedMultiValueMap<String, String>> entry = iterator.next(); + String destination = entry.getKey(); + LinkedMultiValueMap<String, String> sessionMap = entry.getValue(); + List<String> subscriptions = sessionMap.get(sessionId); + if (subscriptions != null) { + subscriptions.remove(subsId); + if (subscriptions.isEmpty()) { + sessionMap.remove(sessionId); + } + if (sessionMap.isEmpty()) { + iterator.remove(); + } + else { + this.accessCache.put(destination, sessionMap.deepCopy()); + } + } + } + } + + public void updateAfterRemovedSession(SessionSubscriptionInfo info) { + for (Iterator<Map.Entry<String, LinkedMultiValueMap<String, String>>> iterator = + this.accessCache.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry<String, LinkedMultiValueMap<String, String>> entry = iterator.next(); + String destination = entry.getKey(); + LinkedMultiValueMap<String, String> sessionMap = entry.getValue(); + if (sessionMap.remove(info.getSessionId()) != null) { + if (sessionMap.isEmpty()) { + iterator.remove(); + } + else { + this.accessCache.put(destination, sessionMap.deepCopy()); + } + } + } + } + + @Override + public String toString() { + return "cache[" + this.accessCache.size() + " destination(s)]"; + } + } + + + /** + * Provide access to session subscriptions by sessionId. + */ + private static class SessionSubscriptionRegistry { + + // sessionId -> SessionSubscriptionInfo + private final ConcurrentMap<String, SessionSubscriptionInfo> sessions = + new ConcurrentHashMap<String, SessionSubscriptionInfo>(); + + public SessionSubscriptionInfo getSubscriptions(String sessionId) { + return this.sessions.get(sessionId); + } + + public Collection<SessionSubscriptionInfo> getAllSubscriptions() { + return this.sessions.values(); + } + + public SessionSubscriptionInfo addSubscription(String sessionId, String subscriptionId, + String destination, Expression selectorExpression) { + + SessionSubscriptionInfo info = this.sessions.get(sessionId); + if (info == null) { + info = new SessionSubscriptionInfo(sessionId); + SessionSubscriptionInfo value = this.sessions.putIfAbsent(sessionId, info); + if (value != null) { + info = value; + } + } + info.addSubscription(destination, subscriptionId, selectorExpression); + return info; + } + + public SessionSubscriptionInfo removeSubscriptions(String sessionId) { + return this.sessions.remove(sessionId); + } + + @Override + public String toString() { + return "registry[" + this.sessions.size() + " sessions]"; + } + } + + + /** + * Hold subscriptions for a session. + */ + private static class SessionSubscriptionInfo { + + private final String sessionId; + + // destination -> subscriptions + private final Map<String, Set<Subscription>> destinationLookup = + new ConcurrentHashMap<String, Set<Subscription>>(4); + + public SessionSubscriptionInfo(String sessionId) { + Assert.notNull(sessionId, "'sessionId' must not be null"); + this.sessionId = sessionId; + } + + public String getSessionId() { + return this.sessionId; + } + + public Set<String> getDestinations() { + return this.destinationLookup.keySet(); + } + + public Set<Subscription> getSubscriptions(String destination) { + return this.destinationLookup.get(destination); + } + + public Subscription getSubscription(String subscriptionId) { + for (String destination : this.destinationLookup.keySet()) { + Set<Subscription> subs = this.destinationLookup.get(destination); + if (subs != null) { + for (Subscription sub : subs) { + if (sub.getId().equalsIgnoreCase(subscriptionId)) { + return sub; + } + } + } + } + return null; + } + + public void addSubscription(String destination, String subscriptionId, Expression selectorExpression) { + Set<Subscription> subs = this.destinationLookup.get(destination); + if (subs == null) { + synchronized (this.destinationLookup) { + subs = this.destinationLookup.get(destination); + if (subs == null) { + subs = new CopyOnWriteArraySet<Subscription>(); + this.destinationLookup.put(destination, subs); + } + } + } + subs.add(new Subscription(subscriptionId, selectorExpression)); + } + + public String removeSubscription(String subscriptionId) { + for (String destination : this.destinationLookup.keySet()) { + Set<Subscription> subs = this.destinationLookup.get(destination); + if (subs != null) { + for (Subscription sub : subs) { + if (sub.getId().equals(subscriptionId) && subs.remove(sub)) { + synchronized (this.destinationLookup) { + if (subs.isEmpty()) { + this.destinationLookup.remove(destination); + } + } + return destination; + } + } + } + } + return null; + } + + @Override + public String toString() { + return "[sessionId=" + this.sessionId + ", subscriptions=" + this.destinationLookup + "]"; + } + } + + + private static final class Subscription { + + private final String id; + + private final Expression selectorExpression; + + public Subscription(String id, Expression selector) { + Assert.notNull(id, "Subscription id must not be null"); + this.id = id; + this.selectorExpression = selector; + } + + public String getId() { + return this.id; + } + + public Expression getSelectorExpression() { + return this.selectorExpression; + } + + @Override + public boolean equals(Object other) { + return (this == other || (other instanceof Subscription && this.id.equals(((Subscription) other).id))); + } + + @Override + public int hashCode() { + return this.id.hashCode(); + } + + @Override + public String toString() { + return "subscription(id=" + this.id + ")"; + } + } + + + private static class SimpMessageHeaderPropertyAccessor implements PropertyAccessor { + + @Override + public Class<?>[] getSpecificTargetClasses() { + return new Class<?>[] {MessageHeaders.class}; + } + + @Override + public boolean canRead(EvaluationContext context, Object target, String name) { + return true; + } + + @Override + public TypedValue read(EvaluationContext context, Object target, String name) throws AccessException { + MessageHeaders headers = (MessageHeaders) target; + SimpMessageHeaderAccessor accessor = + MessageHeaderAccessor.getAccessor(headers, SimpMessageHeaderAccessor.class); + Object value; + if ("destination".equalsIgnoreCase(name)) { + value = accessor.getDestination(); + } + else { + value = accessor.getFirstNativeHeader(name); + if (value == null) { + value = headers.get(name); + } + } + return new TypedValue(value); + } + + @Override + public boolean canWrite(EvaluationContext context, Object target, String name) { + return false; + } + + @Override + public void write(EvaluationContext context, Object target, String name, Object value) { + } + } + +}
