Repository: ambari Updated Branches: refs/heads/branch-2.1 4b22f4d87 -> cbfe96b16
AMBARI-13390 Unable to set user value for kafka-broker/kafka.metrics.reporters (dsen) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/08c431d4 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/08c431d4 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/08c431d4 Branch: refs/heads/branch-2.1 Commit: 08c431d4bbb3b2bd58e830009350fb88ba570618 Parents: 4b22f4d Author: Dmytro Sen <[email protected]> Authored: Tue Oct 13 16:11:43 2015 +0300 Committer: Dmytro Sen <[email protected]> Committed: Thu Oct 15 17:58:24 2015 +0300 ---------------------------------------------------------------------- .../server/upgrade/UpgradeCatalog213.java | 38 +++++++++++++ .../0.8.1.2.2/configuration/kafka-broker.xml | 2 +- .../KAFKA/0.8.1.2.2/package/scripts/kafka.py | 5 +- .../KAFKA/0.8.1.2.2/package/scripts/params.py | 10 ---- .../server/upgrade/UpgradeCatalog213Test.java | 56 +++++++++++++++++++- 5 files changed, 95 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/08c431d4/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java index b60404e..4217cdd 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java @@ -65,6 +65,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.UUID; /** @@ -73,6 +74,7 @@ import java.util.UUID; public class UpgradeCatalog213 extends AbstractUpgradeCatalog { private static final String STORM_SITE = "storm-site"; + private static final String KAFKA_BROKER = "kafka-broker"; private static final String AMS_ENV = "ams-env"; private static final String AMS_HBASE_ENV = "ams-hbase-env"; private static final String HBASE_ENV_CONFIG = "hbase-env"; @@ -156,6 +158,7 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog { updateAMSConfigs(); updateHbaseEnvConfig(); updateAlertDefinitions(); + updateKafkaConfigs(); } /** @@ -643,6 +646,41 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog { } + protected void updateKafkaConfigs() throws AmbariException { + AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class); + Clusters clusters = ambariManagementController.getClusters(); + + if (clusters != null) { + Map<String, Cluster> clusterMap = clusters.getClusters(); + if (clusterMap != null && !clusterMap.isEmpty()) { + for (final Cluster cluster : clusterMap.values()) { + Set<String> installedServices =cluster.getServices().keySet(); + Config kafkaBroker = cluster.getDesiredConfigByType(KAFKA_BROKER); + if (kafkaBroker != null) { + Map<String, String> newProperties = new HashMap<>(); + Map<String, String> kafkaBrokerProperties = kafkaBroker.getProperties(); + String kafkaMetricsReporters = kafkaBrokerProperties.get("kafka.metrics.reporters"); + if (kafkaMetricsReporters == null || + "{{kafka_metrics_reporters}}".equals(kafkaMetricsReporters)) { + + if (installedServices.contains("AMBARI_METRICS")) { + newProperties.put("kafka.metrics.reporters", "org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter"); + } else if (installedServices.contains("GANGLIA")) { + newProperties.put("kafka.metrics.reporters", "kafka.ganglia.KafkaGangliaMetricsReporter"); + } else { + newProperties.put("kafka.metrics.reporters", " "); + } + + } + if (!newProperties.isEmpty()) { + updateConfigurationPropertiesForCluster(cluster, KAFKA_BROKER, newProperties, true, true); + } + } + } + } + } + } + protected String updateAmsEnvContent(String oldContent) { if (oldContent == null) { return null; http://git-wip-us.apache.org/repos/asf/ambari/blob/08c431d4/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml index 1cbfade..6a98648 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml @@ -271,7 +271,7 @@ </property> <property> <name>kafka.metrics.reporters</name> - <value>{{kafka_metrics_reporters}}</value> + <value>org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter</value> <description> kafka ganglia metrics reporter and kafka timeline metrics reporter </description> http://git-wip-us.apache.org/repos/asf/ambari/blob/08c431d4/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py index 4d28c41..7f9b4ed 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py @@ -21,8 +21,7 @@ limitations under the License. from resource_management import * from resource_management.libraries.resources.properties_file import PropertiesFile from resource_management.libraries.resources.template_config import TemplateConfig -import sys, os -from copy import deepcopy +import os def kafka(): import params @@ -53,8 +52,6 @@ def kafka(): else: kafka_server_config['host.name'] = params.hostname - - kafka_server_config['kafka.metrics.reporters'] = params.kafka_metrics_reporters if(params.has_metric_collector): kafka_server_config['kafka.timeline.metrics.host'] = params.metric_collector_host kafka_server_config['kafka.timeline.metrics.port'] = params.metric_collector_port http://git-wip-us.apache.org/repos/asf/ambari/blob/08c431d4/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py index 0a55504..dc0c087 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py @@ -85,13 +85,9 @@ if 'ganglia_server_host' in config['clusterHostInfo'] and \ else: ganglia_installed = False -kafka_metrics_reporters="" metric_collector_host = "" metric_collector_port = "" -if ganglia_installed: - kafka_metrics_reporters = "kafka.ganglia.KafkaGangliaMetricsReporter" - ams_collector_hosts = default("/clusterHostInfo/metrics_collector_hosts", []) has_metric_collector = not len(ams_collector_hosts) == 0 @@ -101,12 +97,6 @@ if has_metric_collector: if metric_collector_port and metric_collector_port.find(':') != -1: metric_collector_port = metric_collector_port.split(':')[1] - if not len(kafka_metrics_reporters) == 0: - kafka_metrics_reporters = kafka_metrics_reporters + ',' - - kafka_metrics_reporters = kafka_metrics_reporters + "org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter" - - # Security-related params security_enabled = config['configurations']['cluster-env']['security_enabled'] kafka_kerberos_enabled = ('security.inter.broker.protocol' in config['configurations']['kafka-broker'] and http://git-wip-us.apache.org/repos/asf/ambari/blob/08c431d4/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java index d04e4c9..d3c615e 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java @@ -48,6 +48,7 @@ import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Config; import org.apache.ambari.server.state.ConfigHelper; +import org.apache.ambari.server.state.Service; import org.apache.ambari.server.state.RepositoryVersionState; import org.apache.ambari.server.state.StackId; import org.apache.ambari.server.state.StackInfo; @@ -172,6 +173,7 @@ public class UpgradeCatalog213Test { Method bootstrapRepoVersionForHDP21 = UpgradeCatalog213.class.getDeclaredMethod("bootstrapRepoVersionForHDP21"); Method updateStormConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateStormConfigs"); Method updateAMSConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateAMSConfigs"); + Method updateKafkaConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateKafkaConfigs"); Method updateHbaseEnvConfig = UpgradeCatalog213.class.getDeclaredMethod("updateHbaseEnvConfig"); Method addNewConfigurationsFromXml = AbstractUpgradeCatalog.class.getDeclaredMethod("addNewConfigurationsFromXml"); @@ -184,6 +186,7 @@ public class UpgradeCatalog213Test { .addMockedMethod(bootstrapRepoVersionForHDP21) .addMockedMethod(updateStormConfigs) .addMockedMethod(updateHbaseEnvConfig) + .addMockedMethod(updateKafkaConfigs) .createMock(); upgradeCatalog213.addNewConfigurationsFromXml(); @@ -200,6 +203,8 @@ public class UpgradeCatalog213Test { expectLastCall().once(); upgradeCatalog213.updateAlertDefinitions(); expectLastCall().once(); + upgradeCatalog213.updateKafkaConfigs(); + expectLastCall().once(); replay(upgradeCatalog213); @@ -503,6 +508,55 @@ public class UpgradeCatalog213Test { Assert.assertEquals(expected, upgradeCatalog213.modifyJournalnodeProcessAlertSource(alertSource)); } + @Test + public void testUpdateKafkaConfigs() throws Exception { + EasyMockSupport easyMockSupport = new EasyMockSupport(); + final AmbariManagementController mockAmbariManagementController = easyMockSupport.createNiceMock(AmbariManagementController.class); + final ConfigHelper mockConfigHelper = easyMockSupport.createMock(ConfigHelper.class); + + final Clusters mockClusters = easyMockSupport.createStrictMock(Clusters.class); + final Cluster mockClusterExpected = easyMockSupport.createNiceMock(Cluster.class); + final Map<String, String> propertiesAmsEnv = new HashMap<String, String>() { + { + put("kafka.metrics.reporters", "{{kafka_metrics_reporters}}"); + } + }; + final Map<String, Service> installedServices = new HashMap<String, Service>() { + { + put("KAFKA", null); + put("AMBARI_METRICS", null); + } + }; + + final Config mockAmsEnv = easyMockSupport.createNiceMock(Config.class); + + final Injector mockInjector = Guice.createInjector(new AbstractModule() { + @Override + protected void configure() { + bind(AmbariManagementController.class).toInstance(mockAmbariManagementController); + bind(ConfigHelper.class).toInstance(mockConfigHelper); + bind(Clusters.class).toInstance(mockClusters); + bind(EntityManager.class).toInstance(entityManager); + + bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class)); + bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class)); + } + }); + + expect(mockAmbariManagementController.getClusters()).andReturn(mockClusters).once(); + expect(mockClusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{ + put("normal", mockClusterExpected); + }}).once(); + + expect(mockClusterExpected.getServices()).andReturn(installedServices).atLeastOnce(); + expect(mockClusterExpected.getDesiredConfigByType("kafka-broker")).andReturn(mockAmsEnv).atLeastOnce(); + expect(mockAmsEnv.getProperties()).andReturn(propertiesAmsEnv).atLeastOnce(); + + easyMockSupport.replayAll(); + mockInjector.getInstance(UpgradeCatalog213.class).updateKafkaConfigs(); + easyMockSupport.verifyAll(); + } + /** * @param dbAccessor * @return @@ -582,4 +636,4 @@ public class UpgradeCatalog213Test { Assert.assertEquals("upgrade_type", upgradeTypeCol.getName()); } } -} \ No newline at end of file +}
