AMBARI-20208 Atlas kafka servers should be configured using kafka listeners (Vishal Suvagia via mugdha)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/56af5741 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/56af5741 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/56af5741 Branch: refs/heads/branch-feature-AMBARI-12556 Commit: 56af5741fa0fb7398438fe5bac2cb207c0f93df5 Parents: 69816f9 Author: Vishal Suvagia <vishalsuva...@yahoo.com> Authored: Thu Mar 2 14:56:45 2017 +0530 Committer: Mugdha Varadkar <mug...@apache.org> Committed: Fri Mar 3 10:50:24 2017 +0530 ---------------------------------------------------------------------- .../configuration/application-properties.xml | 8 +++--- .../stacks/HDP/2.5/services/stack_advisor.py | 21 +++++++++++++++ .../stacks/2.5/common/test_stack_advisor.py | 28 +++++++++++++++++++- 3 files changed, 52 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/56af5741/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.2.5/configuration/application-properties.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.2.5/configuration/application-properties.xml b/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.2.5/configuration/application-properties.xml index 15cc73b..70af02c 100644 --- a/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.2.5/configuration/application-properties.xml +++ b/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.2.5/configuration/application-properties.xml @@ -148,10 +148,10 @@ <value/> <description>Comma separated list of Kafka broker endpoints in host:port form</description> <depends-on> - <property> - <type>kafka-broker</type> - <name>port</name> - </property> + <property> + <type>kafka-broker</type> + <name>listeners</name> + </property> </depends-on> <on-ambari-upgrade add="false"/> </property> http://git-wip-us.apache.org/repos/asf/ambari/blob/56af5741/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py index 85e632d..a9fd567 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py @@ -593,6 +593,27 @@ class HDP25StackAdvisor(HDP24StackAdvisor): else: kafka_broker_port = '6667' + if 'kafka-broker' in services['configurations'] and 'listeners' in services['configurations']['kafka-broker']['properties']: + kafka_server_listeners = services['configurations']['kafka-broker']['properties']['listeners'] + else: + kafka_server_listeners = 'PLAINTEXT://localhost:6667' + + security_enabled = self.isSecurityEnabled(services) + + if ',' in kafka_server_listeners and len(kafka_server_listeners.split(',')) > 1: + for listener in kafka_server_listeners.split(','): + listener = listener.strip().split(':') + if len(listener) == 3: + if 'SASL' in listener[0] and security_enabled: + kafka_broker_port = listener[2] + break + elif 'SASL' not in listener[0] and not security_enabled: + kafka_broker_port = listener[2] + else: + listener = kafka_server_listeners.strip().split(':') + if len(listener) == 3: + kafka_broker_port = listener[2] + kafka_host_arr = [] for i in range(len(kafka_hosts)): kafka_host_arr.append(kafka_hosts[i] + ':' + kafka_broker_port) http://git-wip-us.apache.org/repos/asf/ambari/blob/56af5741/ambari-server/src/test/python/stacks/2.5/common/test_stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.5/common/test_stack_advisor.py b/ambari-server/src/test/python/stacks/2.5/common/test_stack_advisor.py index a913b35..80d70db 100644 --- a/ambari-server/src/test/python/stacks/2.5/common/test_stack_advisor.py +++ b/ambari-server/src/test/python/stacks/2.5/common/test_stack_advisor.py @@ -4380,13 +4380,19 @@ class TestHDP25StackAdvisor(TestCase): "kafka-broker": { "properties": { "zookeeper.connect": "c6401.ambari.apache.org", - "port": "6667" + "port": "6667", + "listeners": "PLAINTEXT://localhost:6667" } }, 'ranger-atlas-plugin-properties': { 'properties': { 'ranger-atlas-plugin-enabled':'No' } + }, + "cluster-env": { + "properties": { + "security_enabled": "false" + } } }, "changed-configurations": [ ] @@ -4423,6 +4429,26 @@ class TestHDP25StackAdvisor(TestCase): self.stackAdvisor.recommendAtlasConfigurations(configurations, clusterData, services, hosts) self.assertEquals(configurations, expected) + services['configurations']['kafka-broker']['properties']['listeners'] = ' PLAINTEXT://localhost:5522 , PLAINTEXTSASL://localhost:2255 ' + expected['application-properties']['properties']['atlas.kafka.bootstrap.servers'] = 'c6401.ambari.apache.org:5522' + self.stackAdvisor.recommendAtlasConfigurations(configurations, clusterData, services, hosts) + self.assertEquals(configurations, expected) + services['configurations']['cluster-env']['properties']['security_enabled']='true' + services['configurations']['kafka-broker']['properties']['listeners'] = ' PLAINTEXT://localhost:5522 , PLAINTEXTSASL://localhost:2266 ' + expected['application-properties']['properties']['atlas.kafka.bootstrap.servers'] = 'c6401.ambari.apache.org:2266' + self.stackAdvisor.recommendAtlasConfigurations(configurations, clusterData, services, hosts) + self.assertEquals(configurations, expected) + services['configurations']['kafka-broker']['properties']['listeners'] = ' SASL_PLAINTEXT://localhost:2233 , PLAINTEXT://localhost:5577 ' + expected['application-properties']['properties']['atlas.kafka.bootstrap.servers'] = 'c6401.ambari.apache.org:2233' + self.stackAdvisor.recommendAtlasConfigurations(configurations, clusterData, services, hosts) + self.assertEquals(configurations, expected) + + services['configurations']['cluster-env']['properties']['security_enabled']='false' + expected['application-properties']['properties']['atlas.kafka.bootstrap.servers'] = 'c6401.ambari.apache.org:5577' + self.stackAdvisor.recommendAtlasConfigurations(configurations, clusterData, services, hosts) + self.assertEquals(configurations, expected) + + def test_validationAtlasConfigs(self): servicesInfo = [ {