AMBARI-21043. Backport Ambari-17694 - Kafka listeners property does not show SASL_PLAINTEXT protocol when Kerberos is enabled (rlevas)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/bba703bc Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/bba703bc Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/bba703bc Branch: refs/heads/branch-feature-AMBARI-14714 Commit: bba703bc600555e596c49aef8749df65c9ac918c Parents: 8bf136a Author: Robert Levas <[email protected]> Authored: Wed May 17 14:33:03 2017 -0400 Committer: Robert Levas <[email protected]> Committed: Wed May 17 14:33:03 2017 -0400 ---------------------------------------------------------------------- .../server/upgrade/UpgradeCatalog251.java | 47 +++++++++- .../stacks/HDP/2.3/services/stack_advisor.py | 2 +- .../server/upgrade/UpgradeCatalog251Test.java | 92 ++++++++++++++++++++ 3 files changed, 139 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/bba703bc/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java index 6f8f2a6..745890c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,9 +18,18 @@ package org.apache.ambari.server.upgrade; import java.sql.SQLException; +import java.util.Collections; +import java.util.Map; +import java.util.Set; import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.controller.AmbariManagementController; import org.apache.ambari.server.orm.DBAccessor.DBColumnInfo; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.Config; +import org.apache.ambari.server.state.SecurityType; +import org.apache.commons.lang.StringUtils; import com.google.inject.Inject; import com.google.inject.Injector; @@ -33,6 +42,8 @@ public class UpgradeCatalog251 extends AbstractUpgradeCatalog { static final String HOST_ROLE_COMMAND_TABLE = "host_role_command"; static final String HRC_IS_BACKGROUND_COLUMN = "is_background"; + protected static final String KAFKA_BROKER_CONFIG = "kafka-broker"; + /** * Constructor. * @@ -79,6 +90,40 @@ public class UpgradeCatalog251 extends AbstractUpgradeCatalog { */ @Override protected void executeDMLUpdates() throws AmbariException, SQLException { + updateKAFKAConfigs(); + } + + /** + * Ensure that the updates from Ambari 2.4.0 are applied in the event the initial version is + * Ambari 2.5.0, since this Kafka change failed to make it into Ambari 2.5.0. + * + * If the base version was before Ambari 2.5.0, this method should wind up doing nothing. + * @throws AmbariException + */ + protected void updateKAFKAConfigs() throws AmbariException { + AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class); + Clusters clusters = ambariManagementController.getClusters(); + if (clusters != null) { + Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters); + if (clusterMap != null && !clusterMap.isEmpty()) { + for (final Cluster cluster : clusterMap.values()) { + Set<String> installedServices = cluster.getServices().keySet(); + + if (installedServices.contains("KAFKA") && cluster.getSecurityType() == SecurityType.KERBEROS) { + Config kafkaBroker = cluster.getDesiredConfigByType(KAFKA_BROKER_CONFIG); + if (kafkaBroker != null) { + String listenersPropertyValue = kafkaBroker.getProperties().get("listeners"); + if (StringUtils.isNotEmpty(listenersPropertyValue)) { + String newListenersPropertyValue = listenersPropertyValue.replaceAll("\\bPLAINTEXT\\b", "PLAINTEXTSASL"); + if(!newListenersPropertyValue.equals(listenersPropertyValue)) { + updateConfigurationProperties(KAFKA_BROKER_CONFIG, Collections.singletonMap("listeners", newListenersPropertyValue), true, false); + } + } + } + } + } + } + } } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/bba703bc/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py index 8cefdac..9efcee0 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py @@ -918,7 +918,7 @@ class HDP23StackAdvisor(HDP22StackAdvisor): "HIVE": {"hiveserver2-site": self.validateHiveServer2Configurations, "hive-site": self.validateHiveConfigurations}, "HBASE": {"hbase-site": self.validateHBASEConfigurations}, - "KAKFA": {"kafka-broker": self.validateKAFKAConfigurations}, + "KAFKA": {"kafka-broker": self.validateKAFKAConfigurations}, "RANGER": {"admin-properties": self.validateRangerAdminConfigurations, "ranger-env": self.validateRangerConfigurationsEnv} } http://git-wip-us.apache.org/repos/asf/ambari/blob/bba703bc/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java index 4575998..d725ec4 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java @@ -20,21 +20,29 @@ package org.apache.ambari.server.upgrade; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.createMockBuilder; +import static org.easymock.EasyMock.createNiceMock; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.newCapture; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reset; import static org.easymock.EasyMock.verify; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; +import java.util.Collections; +import java.util.Map; import javax.persistence.EntityManager; import org.apache.ambari.server.actionmanager.ActionManager; import org.apache.ambari.server.configuration.Configuration; +import org.apache.ambari.server.controller.AmbariManagementController; import org.apache.ambari.server.controller.KerberosHelper; import org.apache.ambari.server.controller.MaintenanceStateHelper; import org.apache.ambari.server.orm.DBAccessor; @@ -42,10 +50,12 @@ import org.apache.ambari.server.orm.DBAccessor.DBColumnInfo; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Config; +import org.apache.ambari.server.state.SecurityType; import org.apache.ambari.server.state.Service; import org.apache.ambari.server.state.stack.OsFamily; import org.easymock.Capture; import org.easymock.EasyMockRunner; +import org.easymock.EasyMockSupport; import org.easymock.Mock; import org.easymock.MockType; import org.junit.After; @@ -53,8 +63,10 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.springframework.security.crypto.password.PasswordEncoder; import com.google.gson.Gson; +import com.google.inject.AbstractModule; import com.google.inject.Binder; import com.google.inject.Guice; import com.google.inject.Injector; @@ -163,4 +175,84 @@ public class UpgradeCatalog251Test { Assert.assertEquals(Integer.valueOf(0), captured.getDefaultValue()); Assert.assertEquals(Short.class, captured.getType()); } + + @Test + public void testExecuteDMLUpdates() throws Exception { + Method updateKAFKAConfigs = UpgradeCatalog251.class.getDeclaredMethod("updateKAFKAConfigs"); + + UpgradeCatalog251 upgradeCatalog251 = createMockBuilder(UpgradeCatalog251.class) + .addMockedMethod(updateKAFKAConfigs) + .createMock(); + + Field field = AbstractUpgradeCatalog.class.getDeclaredField("dbAccessor"); + field.set(upgradeCatalog251, dbAccessor); + + upgradeCatalog251.updateKAFKAConfigs(); + expectLastCall().once(); + + replay(upgradeCatalog251, dbAccessor); + + upgradeCatalog251.executeDMLUpdates(); + + verify(upgradeCatalog251, dbAccessor); + } + + + @Test + public void testUpdateKAFKAConfigs() throws Exception{ + EasyMockSupport easyMockSupport = new EasyMockSupport(); + final AmbariManagementController mockAmbariManagementController = easyMockSupport.createNiceMock(AmbariManagementController.class); + final Clusters mockClusters = easyMockSupport.createStrictMock(Clusters.class); + final Cluster mockClusterExpected = easyMockSupport.createNiceMock(Cluster.class); + + Map<String, String> initialProperties = Collections.singletonMap("listeners", "PLAINTEXT://localhost:6667,SSL://localhost:6666"); + Map<String, String> expectedUpdates = Collections.singletonMap("listeners", "PLAINTEXTSASL://localhost:6667,SSL://localhost:6666"); + + final Config kafkaBroker = easyMockSupport.createNiceMock(Config.class); + expect(kafkaBroker.getProperties()).andReturn(initialProperties).times(1); + // Re-entrant test + expect(kafkaBroker.getProperties()).andReturn(expectedUpdates).times(1); + + final Injector mockInjector = Guice.createInjector(new AbstractModule() { + @Override + protected void configure() { + bind(AmbariManagementController.class).toInstance(mockAmbariManagementController); + bind(Clusters.class).toInstance(mockClusters); + bind(EntityManager.class).toInstance(entityManager); + bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class)); + bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class)); + bind(PasswordEncoder.class).toInstance(createNiceMock(PasswordEncoder.class)); + } + }); + + expect(mockAmbariManagementController.getClusters()).andReturn(mockClusters).atLeastOnce(); + expect(mockClusters.getClusters()).andReturn(Collections.singletonMap("normal", mockClusterExpected)).atLeastOnce(); + expect(mockClusterExpected.getDesiredConfigByType("kafka-broker")).andReturn(kafkaBroker).atLeastOnce(); + expect(mockClusterExpected.getSecurityType()).andReturn(SecurityType.KERBEROS).atLeastOnce(); + expect(mockClusterExpected.getServices()).andReturn(Collections.<String, Service>singletonMap("KAFKA", null)).atLeastOnce(); + + UpgradeCatalog251 upgradeCatalog251 = createMockBuilder(UpgradeCatalog251.class) + .withConstructor(Injector.class) + .withArgs(mockInjector) + .addMockedMethod("updateConfigurationProperties", String.class, + Map.class, boolean.class, boolean.class) + .createMock(); + + + // upgradeCatalog251.updateConfigurationProperties is only expected to execute once since no changes are + // expected when the relevant data have been previously changed + upgradeCatalog251.updateConfigurationProperties("kafka-broker", expectedUpdates, true, false); + expectLastCall().once(); + + easyMockSupport.replayAll(); + replay(upgradeCatalog251); + + // Execute the first time... upgrading to Ambari 2.4.0 + upgradeCatalog251.updateKAFKAConfigs(); + + // Test reentry... upgrading from Ambari 2.4.0 + upgradeCatalog251.updateKAFKAConfigs(); + + easyMockSupport.verifyAll(); + } }
