Repository: ambari Updated Branches: refs/heads/trunk d451aced8 -> e1d4d9766
AMBARI-9965 - RU - Improve performance for large cluster in StackVersionListener (tbeerbower) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e1d4d976 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e1d4d976 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e1d4d976 Branch: refs/heads/trunk Commit: e1d4d9766a222e9375a114e6f959499cabf05ea8 Parents: d451ace Author: tbeerbower <[email protected]> Authored: Sat Mar 7 09:48:13 2015 -0500 Committer: tbeerbower <[email protected]> Committed: Sat Mar 7 09:48:28 2015 -0500 ---------------------------------------------------------------------- .../ambari/server/agent/HeartBeatHandler.java | 69 ++++++++------- .../listeners/upgrade/StackVersionListener.java | 8 +- .../publishers/VersionEventPublisher.java | 65 ++++++++++++++ .../upgrade/StackVersionListenerTest.java | 60 +++++++++++++ .../publishers/VersionEventPublisherTest.java | 90 ++++++++++++++++++++ 5 files changed, 259 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/e1d4d976/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java index 539af00..8833148 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java @@ -31,7 +31,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; -import com.google.common.reflect.TypeToken; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.HostNotFoundException; import org.apache.ambari.server.Role; @@ -51,6 +50,7 @@ import org.apache.ambari.server.events.AlertReceivedEvent; import org.apache.ambari.server.events.HostComponentVersionEvent; import org.apache.ambari.server.events.publishers.AlertEventPublisher; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; +import org.apache.ambari.server.events.publishers.VersionEventPublisher; import org.apache.ambari.server.metadata.ActionMetadata; import org.apache.ambari.server.orm.dao.KerberosPrincipalHostDAO; import org.apache.ambari.server.serveraction.kerberos.KerberosActionDataFile; @@ -153,6 +153,9 @@ public class HeartBeatHandler { @Inject private AmbariEventPublisher ambariEventPublisher; + @Inject + private VersionEventPublisher versionEventPublisher; + /** * KerberosPrincipalHostDAO used to set and get Kerberos principal details */ @@ -523,14 +526,13 @@ public class HeartBeatHandler { //Json structure for component version was incorrect //do nothing, pass this data further for processing } - if (structuredOutput != null && StringUtils.isNotBlank(structuredOutput.getVersion())) { - handleComponentVersionReceived(scHost, structuredOutput.getVersion()); - } - // Safer to recalculate the version even if we don't detect a difference in the value. - // This is useful in case that a manual database edit is done while ambari-server is stopped. - // TODO should be included into handleComponentVersionReceived() after RU becomes stable - HostComponentVersionEvent event = new HostComponentVersionEvent(cl, scHost); - ambariEventPublisher.publish(event); + + String newVersion = structuredOutput == null ? null : structuredOutput.getVersion(); + + // Pass true to always publish a version event. It is safer to recalculate the version even if we don't + // detect a difference in the value. This is useful in case that a manual database edit is done while + // ambari-server is stopped. + handleComponentVersionReceived(cl, scHost, newVersion, true); } // Updating stack version, if needed @@ -676,12 +678,9 @@ public class HeartBeatHandler { scHost.setProcesses(list); } if (extra.containsKey("version")) { - boolean versionWasUpdated = handleComponentVersionReceived(scHost, extra.get("version").toString()); - if (versionWasUpdated) { - // TODO should be included into handleComponentVersionReceived() after RU becomes stable - HostComponentVersionEvent event = new HostComponentVersionEvent(cl, scHost); - ambariEventPublisher.publish(event); - } + String version = extra.get("version").toString(); + + handleComponentVersionReceived(cl, scHost, version, false); } } catch (Exception e) { @@ -734,27 +733,39 @@ public class HeartBeatHandler { } /** - * Updates version of service component and sets upgrade state if needed. + * Updates the version of the given service component, sets the upgrade state (if needed) + * and publishes a version event through the version event publisher. * - * @param scHost service component host - * @param newVersion new version of service component - * - * @return true if component version was updated to new one + * @param cluster the cluster + * @param scHost service component host + * @param newVersion new version of service component + * @param alwaysPublish if true, always publish a version event; if false, + * only publish if the component version was updated */ - private boolean handleComponentVersionReceived(ServiceComponentHost scHost, String newVersion) { - final String previousVersion = scHost.getVersion(); - if (!StringUtils.equals(previousVersion, newVersion)) { - scHost.setVersion(newVersion); - if (previousVersion != null && !previousVersion.equalsIgnoreCase(State.UNKNOWN.toString())) { - scHost.setUpgradeState(UpgradeState.COMPLETE); + private void handleComponentVersionReceived(Cluster cluster, ServiceComponentHost scHost, + String newVersion, boolean alwaysPublish) { + + boolean updated = false; + + if (StringUtils.isNotBlank(newVersion)) { + final String previousVersion = scHost.getVersion(); + if (!StringUtils.equals(previousVersion, newVersion)) { + scHost.setVersion(newVersion); + if (previousVersion != null && !previousVersion.equalsIgnoreCase(State.UNKNOWN.toString())) { + scHost.setUpgradeState(UpgradeState.COMPLETE); + } + updated = true; } - return true; } - return false; + + if (updated || alwaysPublish) { + HostComponentVersionEvent event = new HostComponentVersionEvent(cluster, scHost); + versionEventPublisher.publish(event); + } } /** - * Adds commands from action queue to a heartbeat responce + * Adds commands from action queue to a heartbeat response. */ protected void sendCommands(String hostname, HeartBeatResponse response) throws AmbariException { http://git-wip-us.apache.org/repos/asf/ambari/blob/e1d4d976/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java index 5460092..b09a273 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java @@ -22,7 +22,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.ambari.server.EagerSingleton; import org.apache.ambari.server.events.HostComponentVersionEvent; -import org.apache.ambari.server.events.publishers.AmbariEventPublisher; +import org.apache.ambari.server.events.publishers.VersionEventPublisher; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.ServiceComponentHost; import org.slf4j.Logger; @@ -58,11 +58,11 @@ public class StackVersionListener { /** * Constructor. * - * @param ambariEventPublisher + * @param eventPublisher the publisher */ @Inject - public StackVersionListener(AmbariEventPublisher ambariEventPublisher) { - ambariEventPublisher.register(this); + public StackVersionListener(VersionEventPublisher eventPublisher) { + eventPublisher.register(this); } @Subscribe http://git-wip-us.apache.org/repos/asf/ambari/blob/e1d4d976/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/VersionEventPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/VersionEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/VersionEventPublisher.java new file mode 100644 index 0000000..3a11f38 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/VersionEventPublisher.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.events.publishers; + +import com.google.common.eventbus.EventBus; +import com.google.inject.Singleton; +import org.apache.ambari.server.events.HostComponentVersionEvent; + +/** + * The {@link VersionEventPublisher} is used to publish instances of + * {@link HostComponentVersionEvent} to any {@link com.google.common.eventbus.Subscribe} interested. + * It uses a single-threaded, serial {@link EventBus}. + */ +@Singleton +public class VersionEventPublisher { + /** + * A single threaded event bus for processing version events serially. + */ + private final EventBus m_eventBus; + + /** + * Constructor. + */ + public VersionEventPublisher() { + m_eventBus = new EventBus("version-event-bus"); + } + + /** + * Publishes the specified event to all registered listeners that + * {@link com.google.common.eventbus.Subscribe} to any of the + * {@link org.apache.ambari.server.events.HostComponentVersionEvent} instances. + * + * @param event the event + */ + public void publish(HostComponentVersionEvent event) { + m_eventBus.post(event); + } + + /** + * Register a listener to receive events. The listener should use the + * {@link com.google.common.eventbus.Subscribe} annotation. + * + * @param object + * the listener to receive events. + */ + public void register(Object object) { + m_eventBus.register(object); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e1d4d976/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java new file mode 100644 index 0000000..b44ac30 --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.events.listeners.upgrade; + +import org.apache.ambari.server.events.HostComponentVersionEvent; +import org.apache.ambari.server.events.publishers.VersionEventPublisher; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.ServiceComponentHost; +import org.junit.Test; + +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.*; + +/** + * StackVersionListener tests. + */ +public class StackVersionListenerTest { + + @Test + public void testOnAmbariEvent() throws Exception { + + VersionEventPublisher publisher = createNiceMock(VersionEventPublisher.class); + + Cluster cluster = createNiceMock(Cluster.class); + ServiceComponentHost sch = createNiceMock(ServiceComponentHost.class); + + expect(cluster.getClusterId()).andReturn(99L); + cluster.recalculateClusterVersionState("1.0.0"); + + expect(sch.recalculateHostVersionState()).andReturn("1.0.0").anyTimes(); + + replay(cluster, sch); + + HostComponentVersionEvent event = new HostComponentVersionEvent(cluster, sch); + + StackVersionListener listener = new StackVersionListener(publisher); + + listener.onAmbariEvent(event); + + verify(cluster, sch); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/e1d4d976/ambari-server/src/test/java/org/apache/ambari/server/events/publishers/VersionEventPublisherTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/events/publishers/VersionEventPublisherTest.java b/ambari-server/src/test/java/org/apache/ambari/server/events/publishers/VersionEventPublisherTest.java new file mode 100644 index 0000000..071c6f0 --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/events/publishers/VersionEventPublisherTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.events.publishers; + +import com.google.common.eventbus.Subscribe; +import com.google.inject.Guice; +import com.google.inject.Inject; +import com.google.inject.Injector; +import org.apache.ambari.server.events.HostComponentVersionEvent; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.ServiceComponentHost; +import org.junit.Before; +import org.junit.Test; + +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertEquals; + +/** + * VersionEventPublisher tests. + */ +public class VersionEventPublisherTest { + + private Injector injector; + + @Before + public void setup() throws Exception { + injector = Guice.createInjector(); + } + + @Test + public void testPublish() throws Exception { + + Cluster cluster = createNiceMock(Cluster.class); + ServiceComponentHost sch = createNiceMock(ServiceComponentHost.class); + + expect(cluster.getClusterId()).andReturn(99L); + + replay(cluster, sch); + + VersionEventPublisher publisher = injector.getInstance(VersionEventPublisher.class); + + Listener listener = injector.getInstance(Listener.class); + + HostComponentVersionEvent event = new HostComponentVersionEvent(cluster, sch); + + publisher.publish(event); + + assertEquals(event, listener.getLastEvent()); + + verify(cluster, sch); + } + + private static class Listener { + + private HostComponentVersionEvent lastEvent = null; + + @Inject + private Listener(VersionEventPublisher eventPublisher) { + eventPublisher.register(this); + } + + @Subscribe + public void onEvent(HostComponentVersionEvent event) { + lastEvent = event; + } + + public HostComponentVersionEvent getLastEvent() { + return lastEvent; + } + } +} \ No newline at end of file
