Repository: ambari Updated Branches: refs/heads/trunk 6bcdcd9ac -> 95d85dbc4
AMBARI-13373 hdfs balancer via ambari fails to run once HA is enabled (dsen) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/95d85dbc Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/95d85dbc Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/95d85dbc Branch: refs/heads/trunk Commit: 95d85dbc4a890678a9565b649c037db97076fe67 Parents: 6bcdcd9 Author: Dmytro Sen <[email protected]> Authored: Thu Oct 15 17:33:02 2015 +0300 Committer: Dmytro Sen <[email protected]> Committed: Thu Oct 15 17:33:02 2015 +0300 ---------------------------------------------------------------------- .../server/upgrade/UpgradeCatalog210.java | 37 +++++---------- .../server/upgrade/UpgradeCatalog213.java | 18 ++++++++ .../server/upgrade/UpgradeCatalog213Test.java | 47 ++++++++++++++++++++ 3 files changed, 77 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/95d85dbc/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog210.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog210.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog210.java index 12a1f44..308e7c9 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog210.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog210.java @@ -122,6 +122,7 @@ public class UpgradeCatalog210 extends AbstractUpgradeCatalog { private static final String TOPOLOGY_HOST_REQUEST_TABLE = "topology_host_request"; private static final String TOPOLOGY_HOST_TASK_TABLE = "topology_host_task"; private static final String TOPOLOGY_LOGICAL_TASK_TABLE = "topology_logical_task"; + private static final String HDFS_SITE_CONFIG = "hdfs-site"; // constants for stack table changes private static final String STACK_ID_COLUMN_NAME = "stack_id"; @@ -1458,7 +1459,7 @@ public class UpgradeCatalog210 extends AbstractUpgradeCatalog { /*** * Update dfs.namenode.rpc-address set hostname instead of localhost */ - if (cluster.getDesiredConfigByType("hdfs-site") != null && !cluster.getHosts("HDFS","NAMENODE").isEmpty()) { + if (cluster.getDesiredConfigByType(HDFS_SITE_CONFIG) != null && !cluster.getHosts("HDFS","NAMENODE").isEmpty()) { URI nameNodeRpc = null; String hostName = cluster.getHosts("HDFS","NAMENODE").iterator().next(); @@ -1467,33 +1468,19 @@ public class UpgradeCatalog210 extends AbstractUpgradeCatalog { cluster.getDesiredConfigByType("core-site").getProperties().get("fs.defaultFS") != null) { try { if (isNNHAEnabled(cluster)) { - String nn1RpcAddress = null; - Config hdfsSiteConfig = cluster.getDesiredConfigByType("hdfs-site"); - Map<String, String> properties = hdfsSiteConfig.getProperties(); - String nameServices = properties.get("dfs.nameservices"); - if (!StringUtils.isEmpty(nameServices)) { - String namenodes = properties.get(String.format("dfs.ha.namenodes.%s",nameServices)); - if (!StringUtils.isEmpty(namenodes) && namenodes.split(",").length > 1) { - nn1RpcAddress = properties.get(String.format("dfs.namenode.rpc-address.%s.%s", nameServices, - namenodes.split(",")[0])); - } - } - if (!StringUtils.isEmpty(nn1RpcAddress)) { - if (!nn1RpcAddress.startsWith("http")) { - nn1RpcAddress = "http://" + nn1RpcAddress; - } - nameNodeRpc= new URI(nn1RpcAddress); - } else { - // set default value - nameNodeRpc= new URI("http://localhost:8020"); - } + // NN HA enabled + // Remove dfs.namenode.rpc-address property + Set<String> removePropertiesSet = new HashSet<>(); + removePropertiesSet.add("dfs.namenode.rpc-address"); + removeConfigurationPropertiesFromCluster(cluster, HDFS_SITE_CONFIG, removePropertiesSet); } else { + // NN HA disabled nameNodeRpc = new URI(cluster.getDesiredConfigByType("core-site").getProperties().get("fs.defaultFS")); + Map<String, String> hdfsProp = new HashMap<String, String>(); + hdfsProp.put("dfs.namenode.rpc-address", hostName + ":" + nameNodeRpc.getPort()); + updateConfigurationPropertiesForCluster(cluster, HDFS_SITE_CONFIG, + hdfsProp, false, false); } - Map<String, String> hdfsProp = new HashMap<String, String>(); - hdfsProp.put("dfs.namenode.rpc-address", hostName + ":" + nameNodeRpc.getPort()); - updateConfigurationPropertiesForCluster(cluster, "hdfs-site", - hdfsProp, true, false); } catch (URISyntaxException e) { e.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/95d85dbc/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java index 3eeb6b9..e0ae7c1 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java @@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory; import java.sql.SQLException; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -49,6 +50,7 @@ import java.util.UUID; public class UpgradeCatalog213 extends AbstractUpgradeCatalog { private static final String STORM_SITE = "storm-site"; + private static final String HDFS_SITE_CONFIG = "hdfs-site"; private static final String KAFKA_BROKER = "kafka-broker"; private static final String AMS_ENV = "ams-env"; private static final String AMS_HBASE_ENV = "ams-hbase-env"; @@ -119,6 +121,7 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog { updateAlertDefinitions(); updateStormConfigs(); updateAMSConfigs(); + updateHDFSConfigs(); updateHbaseEnvConfig(); updateKafkaConfigs(); } @@ -192,6 +195,21 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog { return rootJson.toString(); } + protected void updateHDFSConfigs() throws AmbariException { + AmbariManagementController ambariManagementController = injector.getInstance( + AmbariManagementController.class); + Map<String, Cluster> clusterMap = getCheckedClusterMap(ambariManagementController.getClusters()); + + for (final Cluster cluster : clusterMap.values()) { + // Remove dfs.namenode.rpc-address property when NN HA is enabled + if (cluster.getDesiredConfigByType(HDFS_SITE_CONFIG) != null && isNNHAEnabled(cluster)) { + Set<String> removePropertiesSet = new HashSet<>(); + removePropertiesSet.add("dfs.namenode.rpc-address"); + removeConfigurationPropertiesFromCluster(cluster, HDFS_SITE_CONFIG, removePropertiesSet); + } + } + } + protected void updateStormConfigs() throws AmbariException { AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class); Clusters clusters = ambariManagementController.getClusters(); http://git-wip-us.apache.org/repos/asf/ambari/blob/95d85dbc/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java index c945186..15234db 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java @@ -50,6 +50,7 @@ import javax.persistence.EntityManager; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import static org.easymock.EasyMock.createMockBuilder; @@ -95,6 +96,7 @@ public class UpgradeCatalog213Test { @Test public void testExecuteDMLUpdates() throws Exception { Method updateAMSConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateAMSConfigs"); + Method updateHDFSConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateHDFSConfigs"); Method updateKafkaConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateKafkaConfigs"); Method updateStormConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateStormConfigs"); Method addNewConfigurationsFromXml = AbstractUpgradeCatalog.class.getDeclaredMethod("addNewConfigurationsFromXml"); @@ -103,6 +105,7 @@ public class UpgradeCatalog213Test { UpgradeCatalog213 upgradeCatalog213 = createMockBuilder(UpgradeCatalog213.class) .addMockedMethod(updateAMSConfigs) + .addMockedMethod(updateHDFSConfigs) .addMockedMethod(updateStormConfigs) .addMockedMethod(addNewConfigurationsFromXml) .addMockedMethod(updateHbaseEnvConfig) @@ -122,6 +125,8 @@ public class UpgradeCatalog213Test { expectLastCall().once(); upgradeCatalog213.updateKafkaConfigs(); expectLastCall().once(); + upgradeCatalog213.updateHDFSConfigs(); + expectLastCall().once(); replay(upgradeCatalog213); @@ -218,6 +223,48 @@ public class UpgradeCatalog213Test { } @Test + public void testUpdateHDFSConfiguration() throws Exception { + EasyMockSupport easyMockSupport = new EasyMockSupport(); + final AmbariManagementController mockAmbariManagementController = easyMockSupport.createNiceMock(AmbariManagementController.class); + final ConfigHelper mockConfigHelper = easyMockSupport.createMock(ConfigHelper.class); + + final Clusters mockClusters = easyMockSupport.createStrictMock(Clusters.class); + final Cluster mockClusterExpected = easyMockSupport.createNiceMock(Cluster.class); + + final Config mockHdfsSite = easyMockSupport.createNiceMock(Config.class); + + final Map<String, String> propertiesExpectedHdfs = new HashMap<String, String>(); + propertiesExpectedHdfs.put("dfs.namenode.rpc-address", "nn.rpc.address"); + propertiesExpectedHdfs.put("dfs.nameservices", "nn1"); + propertiesExpectedHdfs.put("dfs.ha.namenodes.nn1", "value"); + + final Injector mockInjector = Guice.createInjector(new AbstractModule() { + @Override + protected void configure() { + bind(AmbariManagementController.class).toInstance(mockAmbariManagementController); + bind(ConfigHelper.class).toInstance(mockConfigHelper); + bind(Clusters.class).toInstance(mockClusters); + + bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class)); + bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class)); + } + }); + + expect(mockAmbariManagementController.getClusters()).andReturn(mockClusters).once(); + expect(mockClusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{ + put("normal", mockClusterExpected); + }}).once(); + + // Expected operation + expect(mockClusterExpected.getDesiredConfigByType("hdfs-site")).andReturn(mockHdfsSite).atLeastOnce(); + expect(mockHdfsSite.getProperties()).andReturn(propertiesExpectedHdfs).anyTimes(); + + easyMockSupport.replayAll(); + mockInjector.getInstance(UpgradeCatalog213.class).updateHDFSConfigs(); + easyMockSupport.verifyAll(); + } + + @Test public void testUpdateAmsHbaseEnvContent() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { Method updateAmsHbaseEnvContent = UpgradeCatalog213.class.getDeclaredMethod("updateAmsHbaseEnvContent", String.class); UpgradeCatalog213 upgradeCatalog213 = new UpgradeCatalog213(injector);
