Repository: ambari Updated Branches: refs/heads/trunk cb1109d69 -> d6b861716
AMBARI-17929. Kafka brokers went down after Ambari upgrade due to IllegalArgumentException.(vbrodetskyi) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d6b86171 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d6b86171 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d6b86171 Branch: refs/heads/trunk Commit: d6b8617167484d22ceccfc6f1eb71d0b246392f4 Parents: cb1109d Author: Vitaly Brodetskyi <[email protected]> Authored: Wed Jul 27 23:08:03 2016 +0300 Committer: Vitaly Brodetskyi <[email protected]> Committed: Wed Jul 27 23:08:03 2016 +0300 ---------------------------------------------------------------------- .../server/upgrade/UpgradeCatalog240.java | 23 +++++++ .../server/upgrade/UpgradeCatalog240Test.java | 70 ++++++++++++++++++-- 2 files changed, 88 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/d6b86171/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java index 5495655..a3d9c89 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java @@ -176,6 +176,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog { protected static final String SLIDER_SERVICE_NAME = "SLIDER"; private static final String OOZIE_ENV_CONFIG = "oozie-env"; + protected static final String KAFKA_BROKER_CONFIG = "kafka-broker"; private static final String SLIDER_CLIENT_CONFIG = "slider-client"; private static final String HIVE_ENV_CONFIG = "hive-env"; private static final String AMS_SITE = "ams-site"; @@ -393,6 +394,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog { addManageUserPersistedDataPermission(); allowClusterOperatorToManageCredentials(); updateHDFSConfigs(); + updateKAFKAConfigs(); updateHIVEConfigs(); updateAMSConfigs(); updateClusterEnv(); @@ -1900,6 +1902,27 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog { } } + protected void updateKAFKAConfigs() throws AmbariException { + AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class); + Clusters clusters = ambariManagementController.getClusters(); + Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters); + + for (final Cluster cluster : clusterMap.values()) { + Set<String> installedServices = cluster.getServices().keySet(); + + if (installedServices.contains("KAFKA") && cluster.getSecurityType() == SecurityType.KERBEROS) { + Config kafkaBroker = cluster.getDesiredConfigByType(KAFKA_BROKER_CONFIG); + if (kafkaBroker != null) { + String listenersPropertyValue = kafkaBroker.getProperties().get("listeners"); + if (StringUtils.isNotEmpty(listenersPropertyValue)) { + String newListenersPropertyValue = listenersPropertyValue.replaceAll("PLAINTEXT", "PLAINTEXTSASL"); + updateConfigurationProperties(KAFKA_BROKER_CONFIG, Collections.singletonMap("listeners", newListenersPropertyValue), true, false); + } + } + } + } + } + protected void updateHIVEConfigs() throws AmbariException { AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class); http://git-wip-us.apache.org/repos/asf/ambari/blob/d6b86171/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java index 34ca199..5bbfebd 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java @@ -56,8 +56,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import javax.persistence.EntityManager; - import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.actionmanager.ActionManager; import org.apache.ambari.server.api.services.AmbariMetaInfo; @@ -118,8 +116,12 @@ import org.easymock.Capture; import org.easymock.CaptureType; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; -import org.junit.*; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.springframework.security.crypto.password.PasswordEncoder; import com.google.common.collect.Maps; import com.google.gson.Gson; @@ -130,8 +132,6 @@ import com.google.inject.Injector; import com.google.inject.Module; import com.google.inject.Provider; -import org.springframework.security.crypto.password.PasswordEncoder; - public class UpgradeCatalog240Test { private static final String CAPACITY_SCHEDULER_CONFIG_TYPE = "capacity-scheduler"; private static final String WEBHCAT_SITE_CONFIG_TYPE = "webhcat-site"; @@ -584,6 +584,7 @@ public class UpgradeCatalog240Test { Method updateRecoveryConfigurationDML = UpgradeCatalog240.class.getDeclaredMethod("updateRecoveryConfigurationDML"); Method removeAtlasMetaserverAlert = UpgradeCatalog240.class.getDeclaredMethod("removeAtlasMetaserverAlert"); Method updateRangerHbasePluginProperties = UpgradeCatalog240.class.getDeclaredMethod("updateRangerHbasePluginProperties"); + Method updateKAFKAConfigs = UpgradeCatalog240.class.getDeclaredMethod("updateKAFKAConfigs"); Capture<String> capturedStatements = newCapture(CaptureType.ALL); @@ -633,6 +634,7 @@ public class UpgradeCatalog240Test { .addMockedMethod(updateRecoveryConfigurationDML) .addMockedMethod(removeAtlasMetaserverAlert) .addMockedMethod(updateRangerHbasePluginProperties) + .addMockedMethod(updateKAFKAConfigs) .createMock(); Field field = AbstractUpgradeCatalog.class.getDeclaredField("dbAccessor"); @@ -674,6 +676,7 @@ public class UpgradeCatalog240Test { upgradeCatalog240.updateRecoveryConfigurationDML(); upgradeCatalog240.removeAtlasMetaserverAlert(); upgradeCatalog240.updateRangerHbasePluginProperties(); + upgradeCatalog240.updateKAFKAConfigs(); replay(upgradeCatalog240, dbAccessor); @@ -1108,6 +1111,63 @@ public class UpgradeCatalog240Test { assertTrue(Maps.difference(newPropertiesYarnEnv, updatedProperties).areEqual()); } + @Test + public void testUpdateKAFKAConfigs() throws Exception{ + EasyMockSupport easyMockSupport = new EasyMockSupport(); + final AmbariManagementController mockAmbariManagementController = easyMockSupport.createNiceMock(AmbariManagementController.class); + final Clusters mockClusters = easyMockSupport.createStrictMock(Clusters.class); + final Cluster mockClusterExpected = easyMockSupport.createNiceMock(Cluster.class); + + final Config kafkaBroker = easyMockSupport.createNiceMock(Config.class); + expect(kafkaBroker.getProperties()).andReturn(new HashMap<String, String>(){{ + put("listeners", "PLAINTEXT://localhost:PLAINTEXT6667,PLAINTEXTSSL://localhost:6666PLAINTEXT"); + }} + ).anyTimes(); + + final Injector mockInjector = Guice.createInjector(new AbstractModule() { + @Override + protected void configure() { + bind(AmbariManagementController.class).toInstance(mockAmbariManagementController); + bind(Clusters.class).toInstance(mockClusters); + bind(EntityManager.class).toInstance(entityManager); + bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class)); + bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class)); + bind(PasswordEncoder.class).toInstance(createNiceMock(PasswordEncoder.class)); + } + }); + + expect(mockAmbariManagementController.getClusters()).andReturn(mockClusters).once(); + expect(mockClusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{ + put("normal", mockClusterExpected); + }}).atLeastOnce(); + expect(mockClusterExpected.getDesiredConfigByType("kafka-broker")).andReturn(kafkaBroker).atLeastOnce(); + expect(mockClusterExpected.getSecurityType()).andReturn(SecurityType.KERBEROS); + expect(mockClusterExpected.getServices()).andReturn(new HashMap<String, Service>() { + { + put("KAFKA", null); + } + }).atLeastOnce(); + + UpgradeCatalog240 upgradeCatalog240 = createMockBuilder(UpgradeCatalog240.class) + .withConstructor(Injector.class) + .withArgs(mockInjector) + .addMockedMethod("updateConfigurationProperties", String.class, + Map.class, boolean.class, boolean.class) + .createMock(); + + Map<String, String> expectedUpdates = new HashMap<>(); + expectedUpdates.put("listeners", "PLAINTEXTSASL://localhost:PLAINTEXTSASL6667,PLAINTEXTSASLSSL://localhost:6666PLAINTEXTSASL"); + + upgradeCatalog240.updateConfigurationProperties("kafka-broker", expectedUpdates, + true, false); + expectLastCall().once(); + + easyMockSupport.replayAll(); + replay(upgradeCatalog240); + upgradeCatalog240.updateKAFKAConfigs(); + easyMockSupport.verifyAll(); + } + /** * Test that queue names updated in mapred-site, webhcat-site, tez-site, yarn-env * @throws Exception
