Repository: ambari Updated Branches: refs/heads/branch-2.4 45990d240 -> 39e75271a
Revert "AMBARI-17929. Kafka brokers went down after Ambari upgrade due to IllegalArgumentException.(vbrodetskyi)" This reverts commit f466ad91d9cab1657e638a1ffe6412c8cf9c3a3f. Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9ffd91a3 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9ffd91a3 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9ffd91a3 Branch: refs/heads/branch-2.4 Commit: 9ffd91a3b0d142f34d6551e6db14ca3b64580f95 Parents: 45990d2 Author: Sumit Mohanty <[email protected]> Authored: Thu Jul 28 12:11:47 2016 -0700 Committer: Sumit Mohanty <[email protected]> Committed: Thu Jul 28 12:11:47 2016 -0700 ---------------------------------------------------------------------- .../server/upgrade/UpgradeCatalog240.java | 23 ------- .../server/upgrade/UpgradeCatalog240Test.java | 71 ++------------------ 2 files changed, 6 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/9ffd91a3/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java index a40c850..84b8817 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java @@ -176,7 +176,6 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog { protected static final String SLIDER_SERVICE_NAME = "SLIDER"; private static final String OOZIE_ENV_CONFIG = "oozie-env"; - protected static final String KAFKA_BROKER_CONFIG = "kafka-broker"; private static final String SLIDER_CLIENT_CONFIG = "slider-client"; private static final String HIVE_ENV_CONFIG = "hive-env"; private static final String AMS_SITE = "ams-site"; @@ -394,7 +393,6 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog { addManageUserPersistedDataPermission(); allowClusterOperatorToManageCredentials(); updateHDFSConfigs(); - updateKAFKAConfigs(); updateHIVEConfigs(); updateAMSConfigs(); updateClusterEnv(); @@ -1894,27 +1892,6 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog { } } - protected void updateKAFKAConfigs() throws AmbariException { - AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class); - Clusters clusters = ambariManagementController.getClusters(); - Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters); - - for (final Cluster cluster : clusterMap.values()) { - Set<String> installedServices = cluster.getServices().keySet(); - - if (installedServices.contains("KAFKA") && cluster.getSecurityType() == SecurityType.KERBEROS) { - Config kafkaBroker = cluster.getDesiredConfigByType(KAFKA_BROKER_CONFIG); - if (kafkaBroker != null) { - String listenersPropertyValue = kafkaBroker.getProperties().get("listeners"); - if (StringUtils.isNotEmpty(listenersPropertyValue)) { - String newListenersPropertyValue = listenersPropertyValue.replaceAll("PLAINTEXT", "PLAINTEXTSASL"); - updateConfigurationProperties(KAFKA_BROKER_CONFIG, Collections.singletonMap("listeners", newListenersPropertyValue), true, false); - } - } - } - } - } - protected void updateHIVEConfigs() throws AmbariException { AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class); http://git-wip-us.apache.org/repos/asf/ambari/blob/9ffd91a3/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java index 2093b97..d653907 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java @@ -55,6 +55,9 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; + +import javax.persistence.EntityManager; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.actionmanager.ActionManager; @@ -116,12 +119,8 @@ import org.easymock.Capture; import org.easymock.CaptureType; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.*; import org.junit.rules.TemporaryFolder; -import org.springframework.security.crypto.password.PasswordEncoder; import com.google.common.collect.Maps; import com.google.gson.Gson; @@ -132,6 +131,8 @@ import com.google.inject.Injector; import com.google.inject.Module; import com.google.inject.Provider; +import org.springframework.security.crypto.password.PasswordEncoder; + public class UpgradeCatalog240Test { private static final String CAPACITY_SCHEDULER_CONFIG_TYPE = "capacity-scheduler"; private static final String WEBHCAT_SITE_CONFIG_TYPE = "webhcat-site"; @@ -584,7 +585,6 @@ public class UpgradeCatalog240Test { Method updateRecoveryConfigurationDML = UpgradeCatalog240.class.getDeclaredMethod("updateRecoveryConfigurationDML"); Method removeAtlasMetaserverAlert = UpgradeCatalog240.class.getDeclaredMethod("removeAtlasMetaserverAlert"); Method updateRangerHbasePluginProperties = UpgradeCatalog240.class.getDeclaredMethod("updateRangerHbasePluginProperties"); - Method updateKAFKAConfigs = UpgradeCatalog240.class.getDeclaredMethod("updateKAFKAConfigs"); Capture<String> capturedStatements = newCapture(CaptureType.ALL); @@ -634,7 +634,6 @@ public class UpgradeCatalog240Test { .addMockedMethod(updateRecoveryConfigurationDML) .addMockedMethod(removeAtlasMetaserverAlert) .addMockedMethod(updateRangerHbasePluginProperties) - .addMockedMethod(updateKAFKAConfigs) .createMock(); Field field = AbstractUpgradeCatalog.class.getDeclaredField("dbAccessor"); @@ -676,7 +675,6 @@ public class UpgradeCatalog240Test { upgradeCatalog240.updateRecoveryConfigurationDML(); upgradeCatalog240.removeAtlasMetaserverAlert(); upgradeCatalog240.updateRangerHbasePluginProperties(); - upgradeCatalog240.updateKAFKAConfigs(); replay(upgradeCatalog240, dbAccessor); @@ -1111,63 +1109,6 @@ public class UpgradeCatalog240Test { assertTrue(Maps.difference(newPropertiesYarnEnv, updatedProperties).areEqual()); } - @Test - public void testUpdateKAFKAConfigs() throws Exception{ - EasyMockSupport easyMockSupport = new EasyMockSupport(); - final AmbariManagementController mockAmbariManagementController = easyMockSupport.createNiceMock(AmbariManagementController.class); - final Clusters mockClusters = easyMockSupport.createStrictMock(Clusters.class); - final Cluster mockClusterExpected = easyMockSupport.createNiceMock(Cluster.class); - - final Config kafkaBroker = easyMockSupport.createNiceMock(Config.class); - expect(kafkaBroker.getProperties()).andReturn(new HashMap<String, String>(){{ - put("listeners", "PLAINTEXT://localhost:PLAINTEXT6667,PLAINTEXTSSL://localhost:6666PLAINTEXT"); - }} - ).anyTimes(); - - final Injector mockInjector = Guice.createInjector(new AbstractModule() { - @Override - protected void configure() { - bind(AmbariManagementController.class).toInstance(mockAmbariManagementController); - bind(Clusters.class).toInstance(mockClusters); - bind(EntityManager.class).toInstance(entityManager); - bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class)); - bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class)); - bind(PasswordEncoder.class).toInstance(createNiceMock(PasswordEncoder.class)); - } - }); - - expect(mockAmbariManagementController.getClusters()).andReturn(mockClusters).once(); - expect(mockClusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{ - put("normal", mockClusterExpected); - }}).atLeastOnce(); - expect(mockClusterExpected.getDesiredConfigByType("kafka-broker")).andReturn(kafkaBroker).atLeastOnce(); - expect(mockClusterExpected.getSecurityType()).andReturn(SecurityType.KERBEROS); - expect(mockClusterExpected.getServices()).andReturn(new HashMap<String, Service>() { - { - put("KAFKA", null); - } - }).atLeastOnce(); - - UpgradeCatalog240 upgradeCatalog240 = createMockBuilder(UpgradeCatalog240.class) - .withConstructor(Injector.class) - .withArgs(mockInjector) - .addMockedMethod("updateConfigurationProperties", String.class, - Map.class, boolean.class, boolean.class) - .createMock(); - - Map<String, String> expectedUpdates = new HashMap<>(); - expectedUpdates.put("listeners", "PLAINTEXTSASL://localhost:PLAINTEXTSASL6667,PLAINTEXTSASLSSL://localhost:6666PLAINTEXTSASL"); - - upgradeCatalog240.updateConfigurationProperties("kafka-broker", expectedUpdates, - true, false); - expectLastCall().once(); - - easyMockSupport.replayAll(); - replay(upgradeCatalog240); - upgradeCatalog240.updateKAFKAConfigs(); - easyMockSupport.verifyAll(); - } - /** * Test that queue names updated in mapred-site, webhcat-site, tez-site, yarn-env * @throws Exception
