http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java new file mode 100644 index 0000000..75549c3 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.events.publishers; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.eventbus.EventBus; +import com.google.inject.Singleton; + +@Singleton +public abstract class BufferedUpdateEventPublisher<T> { + + private static final long TIMEOUT = 1L; + private final AtomicLong previousTime = new AtomicLong(0); + private final AtomicBoolean collecting = new AtomicBoolean(false); + private final ConcurrentLinkedQueue<T> buffer = new ConcurrentLinkedQueue<>(); + private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + + public void publish(Collection<T> event, EventBus m_eventBus) { + long eventTime = System.currentTimeMillis(); + if (eventTime - previousTime.get() <= TIMEOUT && !collecting.get()) { + buffer.addAll(event); + collecting.set(true); + scheduledExecutorService.schedule(getScheduledPublished(m_eventBus), + TIMEOUT, TimeUnit.MILLISECONDS); + } else if (collecting.get()) { + buffer.addAll(event); + } else { + //TODO add logging and metrics posting + previousTime.set(eventTime); + m_eventBus.post(event); + } + } + + protected abstract Runnable getScheduledPublished(EventBus m_eventBus); + + protected List<T> retrieveBuffer() { + List<T> bufferContent = new ArrayList<>(); + while (!buffer.isEmpty()) { + bufferContent.add(buffer.poll()); + } + return bufferContent; + } + + protected void resetCollecting() { + previousTime.set(System.currentTimeMillis()); + collecting.set(false); + } +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java index e32e715..a8c1b1d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java @@ -18,14 +18,7 @@ package org.apache.ambari.server.events.publishers; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import org.apache.ambari.server.events.HostComponentUpdate; import org.apache.ambari.server.events.HostComponentsUpdateEvent; @@ -34,28 +27,11 @@ import com.google.common.eventbus.EventBus; import com.google.inject.Singleton; @Singleton -public class HostComponentUpdateEventPublisher { +public class HostComponentUpdateEventPublisher extends BufferedUpdateEventPublisher<HostComponentUpdate> { - private final Long TIMEOUT = 1000L; - private AtomicLong previousTime = new AtomicLong(0); - private AtomicBoolean collecting = new AtomicBoolean(false); - private ConcurrentLinkedQueue<HostComponentUpdate> buffer = new ConcurrentLinkedQueue<>(); - private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); - - public void publish(HostComponentsUpdateEvent event, EventBus m_eventBus) { - Long eventTime = System.currentTimeMillis(); - if (eventTime - previousTime.get() <= TIMEOUT && !collecting.get()) { - buffer.addAll(event.getHostComponentUpdates()); - collecting.set(true); - scheduledExecutorService.schedule(new HostComponentsEventRunnable(m_eventBus), - TIMEOUT, TimeUnit.MILLISECONDS); - } else if (collecting.get()) { - buffer.addAll(event.getHostComponentUpdates()); - } else { - //TODO add logging and metrics posting - previousTime.set(eventTime); - m_eventBus.post(event); - } + @Override + protected Runnable getScheduledPublished(EventBus m_eventBus) { + return new HostComponentsEventRunnable(m_eventBus); } private class HostComponentsEventRunnable implements Runnable { @@ -68,15 +44,12 @@ public class HostComponentUpdateEventPublisher { @Override public void run() { - List<HostComponentUpdate> hostComponentUpdates = new ArrayList<>(); - while (!buffer.isEmpty()) { - hostComponentUpdates.add(buffer.poll()); - } + List<HostComponentUpdate> hostComponentUpdates = retrieveBuffer(); + HostComponentsUpdateEvent resultEvents = new HostComponentsUpdateEvent(hostComponentUpdates); //TODO add logging and metrics posting eventBus.post(resultEvents); - previousTime.set(System.currentTimeMillis()); - collecting.set(false); + resetCollecting(); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java new file mode 100644 index 0000000..7bf1290 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.events.publishers; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.ambari.server.events.ServiceUpdateEvent; + +import com.google.common.eventbus.EventBus; +import com.google.inject.Singleton; + +@Singleton +public class ServiceUpdateEventPublisher extends BufferedUpdateEventPublisher<ServiceUpdateEvent> { + + @Override + protected Runnable getScheduledPublished(EventBus m_eventBus) { + return new ServiceEventRunnable(m_eventBus); + } + + private class ServiceEventRunnable implements Runnable { + + private final EventBus eventBus; + + public ServiceEventRunnable(EventBus eventBus) { + this.eventBus = eventBus; + } + + @Override + public void run() { + List<ServiceUpdateEvent> serviceUpdates = retrieveBuffer(); + List<ServiceUpdateEvent> filtered = new ArrayList<>(); + for (ServiceUpdateEvent event : serviceUpdates) { + int pos = filtered.indexOf(event); + if (pos != -1) { + if (event.getState() != null) { + filtered.get(pos).setState(event.getState()); + } + if (event.getMaintenanceState() != null) { + filtered.get(pos).setMaintenanceState(event.getMaintenanceState()); + } + } else { + filtered.add(event); + } + } + for (ServiceUpdateEvent serviceUpdateEvent : serviceUpdates) { + eventBus.post(serviceUpdateEvent); + } + resetCollecting(); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java index 53738f4..7d343a5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java @@ -17,11 +17,13 @@ */ package org.apache.ambari.server.events.publishers; +import java.util.Collections; import java.util.concurrent.Executors; import org.apache.ambari.server.events.AmbariUpdateEvent; import org.apache.ambari.server.events.HostComponentsUpdateEvent; import org.apache.ambari.server.events.RequestUpdateEvent; +import org.apache.ambari.server.events.ServiceUpdateEvent; import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus; @@ -29,7 +31,7 @@ import com.google.inject.Inject; import com.google.inject.Singleton; @Singleton -public final class StateUpdateEventPublisher { +public class StateUpdateEventPublisher { private final EventBus m_eventBus; @@ -39,6 +41,9 @@ public final class StateUpdateEventPublisher { @Inject private HostComponentUpdateEventPublisher hostComponentUpdateEventPublisher; + @Inject + private ServiceUpdateEventPublisher serviceUpdateEventPublisher; + public StateUpdateEventPublisher() { m_eventBus = new AsyncEventBus("ambari-update-bus", Executors.newSingleThreadExecutor()); @@ -48,7 +53,9 @@ public final class StateUpdateEventPublisher { if (event.getType().equals(AmbariUpdateEvent.Type.REQUEST)) { requestUpdateEventPublisher.publish((RequestUpdateEvent) event, m_eventBus); } else if (event.getType().equals(AmbariUpdateEvent.Type.HOSTCOMPONENT)) { - hostComponentUpdateEventPublisher.publish((HostComponentsUpdateEvent) event, m_eventBus); + hostComponentUpdateEventPublisher.publish(((HostComponentsUpdateEvent) event).getHostComponentUpdates(), m_eventBus); + } else if (event.getType().equals(AmbariUpdateEvent.Type.SERVICE)) { + serviceUpdateEventPublisher.publish(Collections.singletonList((ServiceUpdateEvent) event), m_eventBus); } else { m_eventBus.post(event); } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostComponentDesiredStateDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostComponentDesiredStateDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostComponentDesiredStateDAO.java index 57e409c..f6687cf 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostComponentDesiredStateDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostComponentDesiredStateDAO.java @@ -18,6 +18,7 @@ package org.apache.ambari.server.orm.dao; +import java.util.Collection; import java.util.List; import javax.persistence.EntityManager; @@ -90,7 +91,7 @@ public class HostComponentDesiredStateDAO { public HostComponentDesiredStateEntity findByIndex(Long clusterId, String serviceName, String componentName, Long hostId) { final TypedQuery<HostComponentDesiredStateEntity> query = entityManagerProvider.get() - .createNamedQuery("HostComponentDesiredStateEntity.findByIndex", HostComponentDesiredStateEntity.class); + .createNamedQuery("HostComponentDesiredStateEntity.findByIndexAndHost", HostComponentDesiredStateEntity.class); query.setParameter("clusterId", clusterId); query.setParameter("serviceName", serviceName); @@ -100,6 +101,37 @@ public class HostComponentDesiredStateDAO { return daoUtils.selectSingle(query); } + /** + * Retrieve the single Host Component Desired State for the given unique cluster, service, component, and host. + * + * @param clusterId Cluster ID + * @param serviceName Service Name + * @param componentName Component Name + * @return Return the Host Component Desired State entity that match the criteria. + */ + @RequiresSession + public List<HostComponentDesiredStateEntity> findByIndex(Long clusterId, String serviceName, + String componentName) { + final TypedQuery<HostComponentDesiredStateEntity> query = entityManagerProvider.get() + .createNamedQuery("HostComponentDesiredStateEntity.findByIndex", HostComponentDesiredStateEntity.class); + + query.setParameter("clusterId", clusterId); + query.setParameter("serviceName", serviceName); + query.setParameter("componentName", componentName); + + return daoUtils.selectList(query); + } + + @RequiresSession + public List<HostComponentDesiredStateEntity> findByHosts(Collection<Long> hostIds) { + final TypedQuery<HostComponentDesiredStateEntity> query = entityManagerProvider.get() + .createNamedQuery("HostComponentDesiredStateEntity.findByHosts", HostComponentDesiredStateEntity.class); + + query.setParameter("hostIds", hostIds); + + return daoUtils.selectList(query); + } + @Transactional public void refresh(HostComponentDesiredStateEntity hostComponentDesiredStateEntity) { entityManagerProvider.get().refresh(hostComponentDesiredStateEntity); http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/ServiceComponentDesiredStateDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/ServiceComponentDesiredStateDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/ServiceComponentDesiredStateDAO.java index dfe7d7b..f519974 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/ServiceComponentDesiredStateDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/ServiceComponentDesiredStateDAO.java @@ -18,15 +18,23 @@ package org.apache.ambari.server.orm.dao; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import javax.persistence.EntityManager; import javax.persistence.NoResultException; import javax.persistence.TypedQuery; +import javax.persistence.criteria.CriteriaBuilder; +import javax.persistence.criteria.CriteriaQuery; +import javax.persistence.criteria.Predicate; +import javax.persistence.criteria.Root; import org.apache.ambari.server.orm.RequiresSession; import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntity; import org.apache.ambari.server.orm.entities.ServiceComponentVersionEntity; +import org.apache.commons.collections.MapUtils; import com.google.inject.Inject; import com.google.inject.Provider; @@ -99,6 +107,35 @@ public class ServiceComponentDesiredStateDAO { return entity; } + /** + * Finds a {@link ServiceComponentDesiredStateEntity} by a combination of cluster id, service and component names. + * @param serviceComponentDesiredStates - component names mapped by service names and cluster ids. + * @return all found entities according to input map. + */ + @RequiresSession + public List<ServiceComponentDesiredStateEntity> findByNames(Map<Long, Map<String, List<String>>> serviceComponentDesiredStates) { + if (MapUtils.isEmpty(serviceComponentDesiredStates)) { + return Collections.emptyList(); + } + CriteriaBuilder cb = entityManagerProvider.get().getCriteriaBuilder(); + CriteriaQuery<ServiceComponentDesiredStateEntity> cq = cb.createQuery(ServiceComponentDesiredStateEntity.class); + Root<ServiceComponentDesiredStateEntity> desiredStates = cq.from(ServiceComponentDesiredStateEntity.class); + + List<Predicate> clusters = new ArrayList<>(); + for (Map.Entry<Long, Map<String, List<String>>> cluster : serviceComponentDesiredStates.entrySet()) { + List<Predicate> services = new ArrayList<>(); + for (Map.Entry<String, List<String>> service : cluster.getValue().entrySet()) { + services.add(cb.and(cb.equal(desiredStates.get("serviceName"), service.getKey()), + desiredStates.get("componentName").in(service.getValue()))); + } + clusters.add(cb.and(cb.equal(desiredStates.get("clusterId"), cluster.getKey()), + cb.or(services.toArray(new Predicate[services.size()])))); + } + cq.where(cb.or(clusters.toArray(new Predicate[clusters.size()]))); + TypedQuery<ServiceComponentDesiredStateEntity> query = entityManagerProvider.get().createQuery(cq); + return daoUtils.selectList(query); + } + @Transactional public void refresh(ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntity) { entityManagerProvider.get().refresh(serviceComponentDesiredStateEntity); http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostComponentDesiredStateEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostComponentDesiredStateEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostComponentDesiredStateEntity.java index 36a7a25..c7a38d0 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostComponentDesiredStateEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostComponentDesiredStateEntity.java @@ -65,8 +65,14 @@ import com.google.common.base.Objects; @NamedQuery(name = "HostComponentDesiredStateEntity.findByServiceComponentAndHost", query = "SELECT hcds from HostComponentDesiredStateEntity hcds WHERE hcds.serviceName=:serviceName AND hcds.componentName=:componentName AND hcds.hostEntity.hostName=:hostName"), - @NamedQuery(name = "HostComponentDesiredStateEntity.findByIndex", query = + @NamedQuery(name = "HostComponentDesiredStateEntity.findByIndexAndHost", query = "SELECT hcds from HostComponentDesiredStateEntity hcds WHERE hcds.clusterId=:clusterId AND hcds.serviceName=:serviceName AND hcds.componentName=:componentName AND hcds.hostId=:hostId"), + + @NamedQuery(name = "HostComponentDesiredStateEntity.findByIndex", query = + "SELECT hcds from HostComponentDesiredStateEntity hcds WHERE hcds.clusterId=:clusterId AND hcds.serviceName=:serviceName AND hcds.componentName=:componentName"), + + @NamedQuery(name = "HostComponentDesiredStateEntity.findByHosts", query = + "SELECT hcds from HostComponentDesiredStateEntity hcds WHERE hcds.hostId IN :hostIds"), }) public class HostComponentDesiredStateEntity { http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostComponentStateEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostComponentStateEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostComponentStateEntity.java index 0d295a4..bfbc4c6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostComponentStateEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostComponentStateEntity.java @@ -101,6 +101,10 @@ public class HostComponentStateEntity { private State currentState = State.INIT; @Enumerated(value = EnumType.STRING) + @Column(name = "last_live_state", nullable = true, insertable = true, updatable = true) + private State lastLiveState = State.INIT; + + @Enumerated(value = EnumType.STRING) @Column(name = "upgrade_state", nullable = false, insertable = true, updatable = true) private UpgradeState upgradeState = UpgradeState.NONE; @@ -159,6 +163,14 @@ public class HostComponentStateEntity { this.currentState = currentState; } + public State getLastLiveState() { + return lastLiveState; + } + + public void setLastLiveState(State lastLiveState) { + this.lastLiveState = lastLiveState; + } + public UpgradeState getUpgradeState() { return upgradeState; } @@ -205,6 +217,11 @@ public class HostComponentStateEntity { return false; } + if (lastLiveState != null ? !lastLiveState.equals(that.lastLiveState) + : that.lastLiveState != null) { + return false; + } + if (upgradeState != null ? !upgradeState.equals(that.upgradeState) : that.upgradeState != null) { return false; @@ -232,6 +249,7 @@ public class HostComponentStateEntity { result = 31 * result + (hostEntity != null ? hostEntity.hashCode() : 0); result = 31 * result + (componentName != null ? componentName.hashCode() : 0); result = 31 * result + (currentState != null ? currentState.hashCode() : 0); + result = 31 * result + (lastLiveState != null ? lastLiveState.hashCode() : 0); result = 31 * result + (upgradeState != null ? upgradeState.hashCode() : 0); result = 31 * result + (serviceName != null ? serviceName.hashCode() : 0); result = 31 * result + (version != null ? version.hashCode() : 0); http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java index 044cb0d..0d1fd5f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java @@ -26,6 +26,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.TimeUnit; @@ -1555,20 +1556,21 @@ public class ConfigHelper { /** * Collects actual configurations and configuration attributes for specified host. - * @param hostName host name to collect configurations and configuration attributes + * @param hostId host id to collect configurations and configuration attributes * @return event ready to send to agent * @throws AmbariException */ - public AgentConfigsUpdateEvent getHostActualConfigs(String hostName) throws AmbariException { + public AgentConfigsUpdateEvent getHostActualConfigs(Long hostId) throws AmbariException { TreeMap<String, ClusterConfigs> clustersConfigs = new TreeMap<>(); + Host host = clusters.getHostById(hostId); for (Cluster cl : clusters.getClusters().values()) { Map<String, Map<String, String>> configurations = new HashMap<>(); Map<String, Map<String, Map<String, String>>> configurationAttributes = new HashMap<>(); Map<String, DesiredConfig> clusterDesiredConfigs = cl.getDesiredConfigs(); Map<String, Map<String, String>> configTags = - getEffectiveDesiredTags(cl, hostName, clusterDesiredConfigs); + getEffectiveDesiredTags(cl, host.getHostName(), clusterDesiredConfigs); getAndMergeHostConfigs(configurations, configTags, cl); getAndMergeHostConfigAttributes(configurationAttributes, configTags, cl); @@ -1577,12 +1579,42 @@ public class ConfigHelper { configurations.entrySet().removeIf(e -> e.getValue().isEmpty()); configurationAttributes.entrySet().removeIf(e -> e.getValue().isEmpty()); + SortedMap<String, SortedMap<String, String>> configurationsTreeMap = sortConfigutations(configurations); + SortedMap<String, SortedMap<String, SortedMap<String, String>>> configurationAttributesTreeMap = + sortConfigurationAttributes(configurationAttributes); clustersConfigs.put(Long.toString(cl.getClusterId()), - new ClusterConfigs(configurations, configurationAttributes)); + new ClusterConfigs(configurationsTreeMap, configurationAttributesTreeMap)); } AgentConfigsUpdateEvent agentConfigsUpdateEvent = new AgentConfigsUpdateEvent(clustersConfigs); return agentConfigsUpdateEvent; } + public SortedMap<String, SortedMap<String, String>> sortConfigutations(Map<String, Map<String, String>> configurations) { + SortedMap<String, SortedMap<String, String>> configurationsTreeMap = new TreeMap<>(); + configurations.forEach((k, v) -> { + TreeMap<String, String> c = new TreeMap<>(); + c.putAll(v); + configurationsTreeMap.put(k, c); + }); + + return configurationsTreeMap; + } + + public SortedMap<String, SortedMap<String, SortedMap<String, String>>> sortConfigurationAttributes( + Map<String, Map<String, Map<String, String>>> configurationAttributes) { + SortedMap<String, SortedMap<String, SortedMap<String, String>>> configurationAttributesTreeMap = new TreeMap<>(); + configurationAttributes.forEach((k, v) -> { + SortedMap<String, SortedMap<String, String>> c = new TreeMap<>(); + v.forEach((k1, v1) -> { + SortedMap<String, String> c1 = new TreeMap<>(); + c1.putAll(v1); + c.put(k1, c1); + }); + configurationAttributesTreeMap.put(k, c); + }); + + return configurationAttributesTreeMap; + } + } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/state/Host.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Host.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Host.java index 7f0be86..1c2f501 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/Host.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Host.java @@ -294,6 +294,12 @@ public interface Host extends Comparable { void setState(HostState state); /** + * Set state of host's state machine. + * @param state + */ + void setStateMachineState(HostState state); + + /** * Get the prefix path of all logs * @return prefix */ @@ -407,4 +413,6 @@ public interface Host extends Comparable { * @see ComponentInfo#isVersionAdvertised() */ boolean hasComponentsAdvertisingVersions(StackId stackId) throws AmbariException; + + void calculateHostStatus(Long clusterId) throws AmbariException; } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHost.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHost.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHost.java index 575193f..7f1c746 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHost.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHost.java @@ -96,6 +96,10 @@ public interface ServiceComponentHost { void setState(State state); + State getLastValidState(); + + void setLastValidState(State state); + /** * Gets the version of the component. * @@ -142,6 +146,8 @@ public interface ServiceComponentHost { * @return */ ServiceComponentHostResponse convertToResponse(Map<String, DesiredConfig> desiredConfigs); + ServiceComponentHostResponse convertToResponseStatusOnly(Map<String, DesiredConfig> desiredConfigs, + boolean collectStaleConfigsStatus); void debugDump(StringBuilder sb); http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostFactory.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostFactory.java index 9a7bd19..1ee839c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostFactory.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostFactory.java @@ -20,11 +20,15 @@ package org.apache.ambari.server.state; import org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntity; import org.apache.ambari.server.orm.entities.HostComponentStateEntity; +import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntity; public interface ServiceComponentHostFactory { ServiceComponentHost createNew(ServiceComponent serviceComponent, String hostName); + ServiceComponentHost createNew(ServiceComponent serviceComponent, String hostName, + ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntity); + ServiceComponentHost createExisting(ServiceComponent serviceComponent, HostComponentStateEntity stateEntity, HostComponentDesiredStateEntity desiredStateEntity); http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java index 74b592b..aecd55d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.ObjectNotFoundException; @@ -172,19 +173,21 @@ public class ServiceComponentImpl implements ServiceComponent { updateComponentInfo(); - for (HostComponentStateEntity hostComponentStateEntity : serviceComponentDesiredStateEntity.getHostComponentStateEntities()) { + List<HostComponentDesiredStateEntity> hostComponentDesiredStateEntities = hostComponentDesiredStateDAO.findByIndex( + service.getClusterId(), + service.getName(), + serviceComponentDesiredStateEntity.getComponentName() + ); - HostComponentDesiredStateEntity hostComponentDesiredStateEntity = hostComponentDesiredStateDAO.findByIndex( - hostComponentStateEntity.getClusterId(), - hostComponentStateEntity.getServiceName(), - hostComponentStateEntity.getComponentName(), - hostComponentStateEntity.getHostId() - ); + Map<String, HostComponentDesiredStateEntity> mappedHostComponentDesiredStateEntitites = + hostComponentDesiredStateEntities.stream().collect(Collectors.toMap(h -> h.getHostEntity().getHostName(), + java.util.function.Function.identity())); + for (HostComponentStateEntity hostComponentStateEntity : serviceComponentDesiredStateEntity.getHostComponentStateEntities()) { try { hostComponents.put(hostComponentStateEntity.getHostName(), serviceComponentHostFactory.createExisting(this, - hostComponentStateEntity, hostComponentDesiredStateEntity)); + hostComponentStateEntity, mappedHostComponentDesiredStateEntitites.get(hostComponentStateEntity.getHostName()))); } catch(ProvisionException ex) { StackId currentStackId = getDesiredStackId(); LOG.error(String.format("Can not get host component info: stackName=%s, stackVersion=%s, serviceName=%s, componentName=%s, hostname=%s", http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java index 52ea614..39e8d73 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java @@ -238,8 +238,9 @@ public class AlertDefinitionHash { return coerce(getAlertDefinitionEntities(clusterName, hostName)); } - public Map<Long, Map<Long, AlertDefinition>> getAlertDefinitions(String hostName) throws AmbariException { + public Map<Long, Map<Long, AlertDefinition>> getAlertDefinitions(Long hostId) throws AmbariException { Map<Long, Map<Long, AlertDefinition>> result = new HashMap<>(); + String hostName = m_clusters.get().getHostById(hostId).getHostName(); for (Cluster cluster : m_clusters.get().getClustersForHost(hostName)) { List<AlertDefinition> alertDefinitions = getAlertDefinitions(cluster.getClusterName(), hostName); result.put(cluster.getClusterId(), mapById(alertDefinitions)); http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java index 7d874b5..48d3f5b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java @@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.ReadWriteLock; +import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -78,6 +79,7 @@ import org.apache.ambari.server.orm.dao.AlertDefinitionDAO; import org.apache.ambari.server.orm.dao.AlertDispatchDAO; import org.apache.ambari.server.orm.dao.ClusterDAO; import org.apache.ambari.server.orm.dao.ClusterStateDAO; +import org.apache.ambari.server.orm.dao.HostComponentDesiredStateDAO; import org.apache.ambari.server.orm.dao.HostConfigMappingDAO; import org.apache.ambari.server.orm.dao.HostDAO; import org.apache.ambari.server.orm.dao.HostVersionDAO; @@ -90,6 +92,7 @@ import org.apache.ambari.server.orm.entities.ClusterEntity; import org.apache.ambari.server.orm.entities.ClusterServiceEntity; import org.apache.ambari.server.orm.entities.ClusterStateEntity; import org.apache.ambari.server.orm.entities.ConfigGroupEntity; +import org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntity; import org.apache.ambari.server.orm.entities.HostEntity; import org.apache.ambari.server.orm.entities.HostVersionEntity; import org.apache.ambari.server.orm.entities.PermissionEntity; @@ -306,6 +309,9 @@ public class ClusterImpl implements Cluster { @Inject private StateUpdateEventPublisher stateUpdateEventPublisher; + @Inject + private HostComponentDesiredStateDAO hostComponentDesiredStateDAO; + /** * A simple cache for looking up {@code cluster-env} properties for a cluster. * This map is changed whenever {{cluster-env}} is changed and we receive a @@ -2249,6 +2255,15 @@ public class ClusterImpl implements Cluster { Collection<Host> hosts = clusterHosts.values(); Iterator<Host> iterator = hosts.iterator(); + //TODO to version in sch + List<Long> hostIds = hosts.stream().map(Host::getHostId).collect(Collectors.toList()); + List<HostComponentDesiredStateEntity> hostComponentDesiredStateEntities = + hostIds.isEmpty() ? Collections.EMPTY_LIST : hostComponentDesiredStateDAO.findByHosts(hostIds); + Map<Long, Map<String, HostComponentDesiredStateEntity>> mappedHostIds = hostComponentDesiredStateEntities.stream().collect( + Collectors.groupingBy(HostComponentDesiredStateEntity::getHostId, + Collectors.toMap(HostComponentDesiredStateEntity::getComponentName, Function.identity()) + ) + ); while (iterator.hasNext()) { Host host = iterator.next(); String hostName = host.getHostName(); @@ -2287,8 +2302,15 @@ public class ClusterImpl implements Cluster { boolean maintenanceState = false; if (serviceComponentHostsByHost.containsKey(hostName)) { + Map<String, HostComponentDesiredStateEntity> componentsStates = mappedHostIds.get(host.getHostId()); for (ServiceComponentHost sch : serviceComponentHostsByHost.get(hostName)) { - staleConfig = staleConfig || configHelper.isStaleConfigs(sch, desiredConfigs); + HostComponentDesiredStateEntity componentState = componentsStates == null ? null : + componentsStates.get(sch.getServiceComponentName()); + if (componentState != null) { + staleConfig = staleConfig || configHelper.isStaleConfigs(sch, desiredConfigs, componentState); + } else { + staleConfig = staleConfig || configHelper.isStaleConfigs(sch, desiredConfigs); + } maintenanceState = maintenanceState || maintenanceStateHelper.getEffectiveState(sch) != MaintenanceState.OFF; } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java index 2a9e69e..8a659af 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java @@ -36,6 +36,7 @@ import org.apache.ambari.server.agent.HostInfo; import org.apache.ambari.server.agent.RecoveryReport; import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.controller.HostResponse; +import org.apache.ambari.server.controller.MaintenanceStateHelper; import org.apache.ambari.server.events.HostStateUpdateEvent; import org.apache.ambari.server.events.HostStatusUpdateEvent; import org.apache.ambari.server.events.MaintenanceModeEvent; @@ -66,7 +67,11 @@ import org.apache.ambari.server.state.HostHealthStatus; import org.apache.ambari.server.state.HostHealthStatus.HealthStatus; import org.apache.ambari.server.state.HostState; import org.apache.ambari.server.state.MaintenanceState; +import org.apache.ambari.server.state.Service; +import org.apache.ambari.server.state.ServiceComponent; +import org.apache.ambari.server.state.ServiceComponentHost; import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.state.State; import org.apache.ambari.server.state.configgroup.ConfigGroup; import org.apache.ambari.server.state.fsm.InvalidStateTransitionException; import org.apache.ambari.server.state.fsm.SingleArcTransition; @@ -140,6 +145,9 @@ public class HostImpl implements Host { @Inject private AmbariEventPublisher ambariEventPublisher; + @Inject + private MaintenanceStateHelper maintenanceStateHelper; + /** * The ID of the host which is to retrieve it from JPA. */ @@ -318,14 +326,7 @@ public class HostImpl implements Host { @Override public void transition(HostImpl host, HostEvent event) { HostRegistrationRequestEvent e = (HostRegistrationRequestEvent) event; - host.importHostInfo(e.hostInfo); - host.setLastRegistrationTime(e.registrationTime); - //Initialize heartbeat time and timeInState with registration time. - host.setLastHeartbeatTime(e.registrationTime); - host.setLastAgentEnv(e.agentEnv); - host.setTimeInState(e.registrationTime); - host.setAgentVersion(e.agentVersion); - host.setPublicHostName(e.publicHostName); + host.updateHost(e); String agentVersion = null; if (e.agentVersion != null) { @@ -350,6 +351,11 @@ public class HostImpl implements Host { } host.topologyManager.onHostRegistered(host, associatedWithCluster); + try { + host.restoreComponentsStatuses(); + } catch (AmbariException e1) { + LOG.error("Unable to restore last valid host components status for host", e1); + } } } @@ -563,6 +569,12 @@ public class HostImpl implements Host { } @Override + public void setStateMachineState(HostState state) { + stateMachine.setCurrentState(state); + ambariEventPublisher.publish(new HostStateUpdateEvent(getHostName(), state)); + } + + @Override public void handleEvent(HostEvent event) throws InvalidStateTransitionException { if (LOG.isDebugEnabled()) { @@ -1178,6 +1190,93 @@ public class HostImpl implements Host { return false; } + + public void restoreComponentsStatuses() throws AmbariException { + Long clusterId = null; + for (Cluster cluster : clusters.getClustersForHost(getHostName())) { + clusterId = cluster.getClusterId(); + for (ServiceComponentHost sch : cluster.getServiceComponentHosts(getHostName())) { + Service s = cluster.getService(sch.getServiceName()); + ServiceComponent sc = s.getServiceComponent(sch.getServiceComponentName()); + if (!sc.isClientComponent() && + sch.getState().equals(State.UNKNOWN)) { + State lastValidState = sch.getLastValidState(); + LOG.warn("Restore component state to last valid state for component " + sc.getName() + " on " + + getHostName() + " to " + lastValidState); + sch.setState(lastValidState); + } + } + } + //TODO + if (clusterId != null) { + calculateHostStatus(clusterId); + } + } + + @Override + public void calculateHostStatus(Long clusterId) throws AmbariException { + //Use actual component status to compute the host status + int masterCount = 0; + int mastersRunning = 0; + int slaveCount = 0; + int slavesRunning = 0; + + StackId stackId; + Cluster cluster = clusters.getCluster(clusterId); + stackId = cluster.getDesiredStackVersion(); + + + List<ServiceComponentHost> scHosts = cluster.getServiceComponentHosts(hostName); + for (ServiceComponentHost scHost : scHosts) { + ComponentInfo componentInfo = + ambariMetaInfo.getComponent(stackId.getStackName(), + stackId.getStackVersion(), scHost.getServiceName(), + scHost.getServiceComponentName()); + + String status = scHost.getState().name(); + + String category = componentInfo.getCategory(); + + if (MaintenanceState.OFF == maintenanceStateHelper.getEffectiveState(scHost, this)) { + if (category.equals("MASTER")) { + ++masterCount; + if (status.equals("STARTED")) { + ++mastersRunning; + } + } else if (category.equals("SLAVE")) { + ++slaveCount; + if (status.equals("STARTED")) { + ++slavesRunning; + } + } + } + } + + HostHealthStatus.HealthStatus healthStatus; + if (masterCount == mastersRunning && slaveCount == slavesRunning) { + healthStatus = HostHealthStatus.HealthStatus.HEALTHY; + } else if (masterCount > 0 && mastersRunning < masterCount) { + healthStatus = HostHealthStatus.HealthStatus.UNHEALTHY; + } else { + healthStatus = HostHealthStatus.HealthStatus.ALERT; + } + + setStatus(healthStatus.name()); + } + + @Transactional + public void updateHost(HostRegistrationRequestEvent e) { + importHostInfo(e.hostInfo); + setLastRegistrationTime(e.registrationTime); + //Initialize heartbeat time and timeInState with registration time. + setLastHeartbeatTime(e.registrationTime); + setLastAgentEnv(e.agentEnv); + setTimeInState(e.registrationTime); + setAgentVersion(e.agentVersion); + setPublicHostName(e.publicHostName); + setTimeInState(System.currentTimeMillis()); + setState(HostState.INIT); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java index dfd5b06..7211dcd 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java @@ -763,7 +763,8 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { @AssistedInject public ServiceComponentHostImpl(@Assisted ServiceComponent serviceComponent, - @Assisted String hostName, Clusters clusters, StackDAO stackDAO, HostDAO hostDAO, + @Assisted String hostName, @Assisted ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntity, + Clusters clusters, StackDAO stackDAO, HostDAO hostDAO, ServiceComponentDesiredStateDAO serviceComponentDesiredStateDAO, HostComponentStateDAO hostComponentStateDAO, HostComponentDesiredStateDAO hostComponentDesiredStateDAO, @@ -823,7 +824,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { desiredStateEntity.setAdminState(null); } - persistEntities(hostEntity, stateEntity, desiredStateEntity); + persistEntities(hostEntity, stateEntity, desiredStateEntity, serviceComponentDesiredStateEntity); // publish the service component installed event ServiceComponentInstalledEvent event = new ServiceComponentInstalledEvent(getClusterId(), @@ -837,6 +838,17 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { resetLastOpInfo(); } + @AssistedInject + public ServiceComponentHostImpl(@Assisted ServiceComponent serviceComponent, + @Assisted String hostName, + Clusters clusters, StackDAO stackDAO, HostDAO hostDAO, + ServiceComponentDesiredStateDAO serviceComponentDesiredStateDAO, + HostComponentStateDAO hostComponentStateDAO, + HostComponentDesiredStateDAO hostComponentDesiredStateDAO, + AmbariEventPublisher eventPublisher) { + this(serviceComponent, hostName, null, clusters, stackDAO, hostDAO, + serviceComponentDesiredStateDAO, hostComponentStateDAO, hostComponentDesiredStateDAO, eventPublisher); + } @AssistedInject public ServiceComponentHostImpl(@Assisted ServiceComponent serviceComponent, @@ -894,6 +906,9 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { HostComponentStateEntity stateEntity = getStateEntity(); if (stateEntity != null) { stateEntity.setCurrentState(state); + if (state != State.UNKNOWN) { + stateEntity.setLastLiveState(state); + } stateEntity = hostComponentStateDAO.merge(stateEntity); if (!oldState.equals(state)) { stateUpdateEventPublisher.publish(new HostComponentsUpdateEvent(Collections.singletonList( @@ -907,6 +922,31 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { } @Override + public State getLastValidState() { + HostComponentStateEntity stateEntity = getStateEntity(); + if (stateEntity != null) { + return stateEntity.getLastLiveState(); + } + return State.UNKNOWN; + } + + @Override + public void setLastValidState(State state) { + if (state == State.UNKNOWN) { + return; + } + HostComponentStateEntity stateEntity = getStateEntity(); + if (stateEntity != null) { + stateEntity.setLastLiveState(state); + hostComponentStateDAO.merge(stateEntity); + } else { + LOG.warn("Setting a member on an entity object that may have been " + + "previously deleted, serviceName = " + getServiceName() + ", " + "componentName = " + + getServiceComponentName() + ", " + "hostName = " + getHostName()); + } + } + + @Override public String getVersion() { HostComponentStateEntity stateEntity = getStateEntity(); if (stateEntity != null) { @@ -1216,10 +1256,31 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { r.setActualConfigs(actualConfigs); r.setUpgradeState(upgradeState); - try { - r.setStaleConfig(helper.isStaleConfigs(this, desiredConfigs, hostComponentDesiredStateEntity)); - } catch (Exception e) { - LOG.error("Could not determine stale config", e); + return r; + } + + @Override + public ServiceComponentHostResponse convertToResponseStatusOnly(Map<String, DesiredConfig> desiredConfigs, + boolean collectStaleConfigsStatus) { + String clusterName = serviceComponent.getClusterName(); + String serviceName = serviceComponent.getServiceName(); + String serviceComponentName = serviceComponent.getName(); + String state = getState().toString(); + + ServiceComponentHostResponse r = new ServiceComponentHostResponse(clusterName, serviceName, + serviceComponentName, null, hostName, null, state, null, + null, null, null, null); + + if (collectStaleConfigsStatus) { + + try { + HostComponentDesiredStateEntity hostComponentDesiredStateEntity = getDesiredStateEntity(); + r.setStaleConfig(helper.isStaleConfigs(this, desiredConfigs, hostComponentDesiredStateEntity)); + } catch (Exception e) { + LOG.error("Could not determine stale config", e); + } + } else { + r.setStaleConfig(false); } return r; @@ -1252,10 +1313,13 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { @Transactional void persistEntities(HostEntity hostEntity, HostComponentStateEntity stateEntity, - HostComponentDesiredStateEntity desiredStateEntity) { - ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntity = serviceComponentDesiredStateDAO.findByName( - serviceComponent.getClusterId(), serviceComponent.getServiceName(), - serviceComponent.getName()); + HostComponentDesiredStateEntity desiredStateEntity, + ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntity) { + + if (serviceComponentDesiredStateEntity == null) { + serviceComponentDesiredStateEntity = serviceComponentDesiredStateDAO.findByName( + serviceComponent.getClusterId(), serviceComponent.getServiceName(), serviceComponent.getName()); + } desiredStateEntity.setServiceComponentDesiredStateEntity(serviceComponentDesiredStateEntity); desiredStateEntity.setHostEntity(hostEntity); http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyDeleteFormer.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyDeleteFormer.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyDeleteFormer.java index b20d774..a5953b9 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyDeleteFormer.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyDeleteFormer.java @@ -6,16 +6,15 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.ambari.server.topology; import java.util.Arrays; http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java index f4a2a78..be6b960 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java @@ -38,13 +38,16 @@ import org.apache.ambari.server.controller.AmbariManagementController; import org.apache.ambari.server.controller.internal.CalculatedStatus; import org.apache.ambari.server.orm.DBAccessor; import org.apache.ambari.server.orm.dao.DaoUtils; +import org.apache.ambari.server.orm.dao.HostComponentStateDAO; import org.apache.ambari.server.orm.dao.RequestDAO; +import org.apache.ambari.server.orm.entities.HostComponentStateEntity; import org.apache.ambari.server.orm.entities.RequestEntity; import org.apache.ambari.server.orm.entities.StageEntity; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Config; import org.apache.ambari.server.state.ConfigHelper; +import org.apache.ambari.server.state.State; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +76,7 @@ public class UpgradeCatalog300 extends AbstractUpgradeCatalog { protected static final String COMPONENT_TABLE = "servicecomponentdesiredstate"; protected static final String COMPONENT_DESIRED_STATE_TABLE = "hostcomponentdesiredstate"; protected static final String COMPONENT_STATE_TABLE = "hostcomponentstate"; + protected static final String COMPONENT_LAST_STATE_COLUMN = "last_live_state"; protected static final String SERVICE_DESIRED_STATE_TABLE = "servicedesiredstate"; protected static final String SECURITY_STATE_COLUMN = "security_state"; @@ -122,6 +126,7 @@ public class UpgradeCatalog300 extends AbstractUpgradeCatalog { updateStageTable(); addOpsDisplayNameColumnToHostRoleCommand(); removeSecurityState(); + addHostComponentLastStateTable(); } protected void updateStageTable() throws SQLException { @@ -133,6 +138,11 @@ public class UpgradeCatalog300 extends AbstractUpgradeCatalog { new DBAccessor.DBColumnInfo(REQUEST_DISPLAY_STATUS_COLUMN, String.class, 255, HostRoleStatus.PENDING, false)); } + protected void addHostComponentLastStateTable() throws SQLException { + dbAccessor.addColumn(COMPONENT_STATE_TABLE, + new DBAccessor.DBColumnInfo(COMPONENT_LAST_STATE_COLUMN, String.class, 255, State.UNKNOWN, true)); + } + /** * {@inheritDoc} */ @@ -149,6 +159,7 @@ public class UpgradeCatalog300 extends AbstractUpgradeCatalog { showHcatDeletedUserMessage(); setStatusOfStagesAndRequests(); updateLogSearchConfigs(); + updateHostComponentLastStateTable(); } protected void showHcatDeletedUserMessage() { @@ -346,4 +357,22 @@ public class UpgradeCatalog300 extends AbstractUpgradeCatalog { } } } + + protected void updateHostComponentLastStateTable() throws SQLException { + executeInTransaction(new Runnable() { + @Override + public void run() { + try { + HostComponentStateDAO hostComponentStateDAO = injector.getInstance(HostComponentStateDAO.class); + List<HostComponentStateEntity> hostComponentStateEntities = hostComponentStateDAO.findAll(); + for (HostComponentStateEntity hostComponentStateEntity : hostComponentStateEntities) { + hostComponentStateEntity.setLastLiveState(hostComponentStateEntity.getCurrentState()); + hostComponentStateDAO.merge(hostComponentStateEntity); + } + } catch (Exception e) { + LOG.warn("Setting status for stages and Requests threw exception. ", e); + } + } + }); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/resources/Ambari-DDL-AzureDB-CREATE.sql ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/Ambari-DDL-AzureDB-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-AzureDB-CREATE.sql index b971cbc..8f0cb67 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-AzureDB-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-AzureDB-CREATE.sql @@ -278,6 +278,7 @@ CREATE TABLE hostcomponentstate ( component_name VARCHAR(255) NOT NULL, version VARCHAR(32) NOT NULL DEFAULT 'UNKNOWN', current_state VARCHAR(255) NOT NULL, + last_live_state VARCHAR(255) NOT NULL DEFAULT 'UNKNOWN', host_id BIGINT NOT NULL, service_name VARCHAR(255) NOT NULL, upgrade_state VARCHAR(32) NOT NULL DEFAULT 'NONE', http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql index e7359a7..5b03df5 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql @@ -201,6 +201,7 @@ CREATE TABLE hostcomponentstate ( component_name VARCHAR(255) NOT NULL, version VARCHAR(32) NOT NULL DEFAULT 'UNKNOWN', current_state VARCHAR(255) NOT NULL, + last_live_state VARCHAR(255) NOT NULL DEFAULT 'UNKNOWN', host_id BIGINT NOT NULL, service_name VARCHAR(255) NOT NULL, upgrade_state VARCHAR(32) NOT NULL DEFAULT 'NONE', http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql index 530411a..6bd2cb6 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql @@ -222,6 +222,7 @@ CREATE TABLE hostcomponentstate ( component_name VARCHAR(100) NOT NULL, version VARCHAR(32) NOT NULL DEFAULT 'UNKNOWN', current_state VARCHAR(255) NOT NULL, + last_live_state VARCHAR(255) NOT NULL DEFAULT 'UNKNOWN', host_id BIGINT NOT NULL, service_name VARCHAR(100) NOT NULL, upgrade_state VARCHAR(32) NOT NULL DEFAULT 'NONE', http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql index c0b2f0c..4d49dca 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql @@ -201,6 +201,7 @@ CREATE TABLE hostcomponentstate ( component_name VARCHAR2(255) NOT NULL, version VARCHAR2(32) DEFAULT 'UNKNOWN' NOT NULL, current_state VARCHAR2(255) NOT NULL, + last_live_state VARCHAR2(255) NOT NULL DEFAULT 'UNKNOWN', host_id NUMBER(19) NOT NULL, service_name VARCHAR2(255) NOT NULL, upgrade_state VARCHAR2(32) DEFAULT 'NONE' NOT NULL, http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql index 90cdbfe..4d084e8 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql @@ -199,6 +199,7 @@ CREATE TABLE hostcomponentstate ( component_name VARCHAR(255) NOT NULL, version VARCHAR(32) NOT NULL DEFAULT 'UNKNOWN', current_state VARCHAR(255) NOT NULL, + last_live_state VARCHAR(255) NOT NULL DEFAULT 'UNKNOWN', host_id BIGINT NOT NULL, service_name VARCHAR(255) NOT NULL, upgrade_state VARCHAR(32) NOT NULL DEFAULT 'NONE', http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql index 7f39535..a19ca73 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql @@ -198,6 +198,7 @@ CREATE TABLE hostcomponentstate ( component_name VARCHAR(255) NOT NULL, version VARCHAR(32) NOT NULL DEFAULT 'UNKNOWN', current_state VARCHAR(255) NOT NULL, + last_live_state VARCHAR(255) NOT NULL DEFAULT 'UNKNOWN', host_id NUMERIC(19) NOT NULL, service_name VARCHAR(255) NOT NULL, upgrade_state VARCHAR(32) NOT NULL DEFAULT 'NONE', http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql index aa06c4d..96fd7fc 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql @@ -212,6 +212,7 @@ CREATE TABLE hostcomponentstate ( component_name VARCHAR(255) NOT NULL, version VARCHAR(32) NOT NULL DEFAULT 'UNKNOWN', current_state VARCHAR(255) NOT NULL, + last_live_state VARCHAR(255) NOT NULL DEFAULT 'UNKNOWN', host_id BIGINT NOT NULL, service_name VARCHAR(255) NOT NULL, upgrade_state VARCHAR(32) NOT NULL DEFAULT 'NONE',
