AMBARI-20208 Atlas kafka servers should be configured using kafka listeners 
(Vishal Suvagia via mugdha)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/56af5741
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/56af5741
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/56af5741

Branch: refs/heads/branch-feature-AMBARI-12556
Commit: 56af5741fa0fb7398438fe5bac2cb207c0f93df5
Parents: 69816f9
Author: Vishal Suvagia <vishalsuva...@yahoo.com>
Authored: Thu Mar 2 14:56:45 2017 +0530
Committer: Mugdha Varadkar <mug...@apache.org>
Committed: Fri Mar 3 10:50:24 2017 +0530

----------------------------------------------------------------------
 .../configuration/application-properties.xml    |  8 +++---
 .../stacks/HDP/2.5/services/stack_advisor.py    | 21 +++++++++++++++
 .../stacks/2.5/common/test_stack_advisor.py     | 28 +++++++++++++++++++-
 3 files changed, 52 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/56af5741/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.2.5/configuration/application-properties.xml
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.2.5/configuration/application-properties.xml
 
b/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.2.5/configuration/application-properties.xml
index 15cc73b..70af02c 100644
--- 
a/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.2.5/configuration/application-properties.xml
+++ 
b/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.2.5/configuration/application-properties.xml
@@ -148,10 +148,10 @@
     <value/>
     <description>Comma separated list of Kafka broker endpoints in host:port 
form</description>
     <depends-on>
-      <property>
-        <type>kafka-broker</type>
-        <name>port</name>
-      </property>
+     <property>
+       <type>kafka-broker</type>
+       <name>listeners</name>
+     </property>
     </depends-on>
     <on-ambari-upgrade add="false"/>
   </property>

http://git-wip-us.apache.org/repos/asf/ambari/blob/56af5741/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py 
b/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py
index 85e632d..a9fd567 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py
@@ -593,6 +593,27 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
       else:
         kafka_broker_port = '6667'
 
+      if 'kafka-broker' in services['configurations'] and 'listeners' in 
services['configurations']['kafka-broker']['properties']:
+        kafka_server_listeners = 
services['configurations']['kafka-broker']['properties']['listeners']
+      else:
+        kafka_server_listeners = 'PLAINTEXT://localhost:6667'
+
+      security_enabled = self.isSecurityEnabled(services)
+
+      if ',' in kafka_server_listeners and 
len(kafka_server_listeners.split(',')) > 1:
+        for listener in kafka_server_listeners.split(','):
+          listener = listener.strip().split(':')
+          if len(listener) == 3:
+            if 'SASL' in listener[0] and security_enabled:
+              kafka_broker_port = listener[2]
+              break
+            elif  'SASL' not in listener[0] and not security_enabled:
+              kafka_broker_port = listener[2]
+      else:
+        listener = kafka_server_listeners.strip().split(':')
+        if len(listener) == 3:
+          kafka_broker_port  = listener[2]
+
       kafka_host_arr = []
       for i in range(len(kafka_hosts)):
         kafka_host_arr.append(kafka_hosts[i] + ':' + kafka_broker_port)

http://git-wip-us.apache.org/repos/asf/ambari/blob/56af5741/ambari-server/src/test/python/stacks/2.5/common/test_stack_advisor.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/python/stacks/2.5/common/test_stack_advisor.py 
b/ambari-server/src/test/python/stacks/2.5/common/test_stack_advisor.py
index a913b35..80d70db 100644
--- a/ambari-server/src/test/python/stacks/2.5/common/test_stack_advisor.py
+++ b/ambari-server/src/test/python/stacks/2.5/common/test_stack_advisor.py
@@ -4380,13 +4380,19 @@ class TestHDP25StackAdvisor(TestCase):
         "kafka-broker": {
           "properties": {
             "zookeeper.connect": "c6401.ambari.apache.org",
-            "port": "6667"
+            "port": "6667",
+            "listeners": "PLAINTEXT://localhost:6667"
           }
         },
         'ranger-atlas-plugin-properties': {
           'properties': {
             'ranger-atlas-plugin-enabled':'No'
           }
+        },
+        "cluster-env": {
+          "properties": {
+            "security_enabled": "false"
+          }
         }
       },
       "changed-configurations": [ ]
@@ -4423,6 +4429,26 @@ class TestHDP25StackAdvisor(TestCase):
     self.stackAdvisor.recommendAtlasConfigurations(configurations, 
clusterData, services, hosts)
     self.assertEquals(configurations, expected)
 
+    services['configurations']['kafka-broker']['properties']['listeners'] = '  
PLAINTEXT://localhost:5522  ,  PLAINTEXTSASL://localhost:2255   '
+    
expected['application-properties']['properties']['atlas.kafka.bootstrap.servers']
 = 'c6401.ambari.apache.org:5522'
+    self.stackAdvisor.recommendAtlasConfigurations(configurations, 
clusterData, services, hosts)
+    self.assertEquals(configurations, expected)
+    
services['configurations']['cluster-env']['properties']['security_enabled']='true'
+    services['configurations']['kafka-broker']['properties']['listeners'] = '  
PLAINTEXT://localhost:5522  ,  PLAINTEXTSASL://localhost:2266   '
+    
expected['application-properties']['properties']['atlas.kafka.bootstrap.servers']
 = 'c6401.ambari.apache.org:2266'
+    self.stackAdvisor.recommendAtlasConfigurations(configurations, 
clusterData, services, hosts)
+    self.assertEquals(configurations, expected)
+    services['configurations']['kafka-broker']['properties']['listeners'] = '  
SASL_PLAINTEXT://localhost:2233   , PLAINTEXT://localhost:5577  '
+    
expected['application-properties']['properties']['atlas.kafka.bootstrap.servers']
 = 'c6401.ambari.apache.org:2233'
+    self.stackAdvisor.recommendAtlasConfigurations(configurations, 
clusterData, services, hosts)
+    self.assertEquals(configurations, expected)
+
+    
services['configurations']['cluster-env']['properties']['security_enabled']='false'
+    
expected['application-properties']['properties']['atlas.kafka.bootstrap.servers']
 = 'c6401.ambari.apache.org:5577'
+    self.stackAdvisor.recommendAtlasConfigurations(configurations, 
clusterData, services, hosts)
+    self.assertEquals(configurations, expected)
+
+
   def test_validationAtlasConfigs(self):
     servicesInfo = [
       {

Reply via email to