Repository: ambari
Updated Branches:
  refs/heads/branch-2.1 4b22f4d87 -> cbfe96b16


AMBARI-13390 Unable to set user value for kafka-broker/kafka.metrics.reporters 
(dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/08c431d4
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/08c431d4
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/08c431d4

Branch: refs/heads/branch-2.1
Commit: 08c431d4bbb3b2bd58e830009350fb88ba570618
Parents: 4b22f4d
Author: Dmytro Sen <[email protected]>
Authored: Tue Oct 13 16:11:43 2015 +0300
Committer: Dmytro Sen <[email protected]>
Committed: Thu Oct 15 17:58:24 2015 +0300

----------------------------------------------------------------------
 .../server/upgrade/UpgradeCatalog213.java       | 38 +++++++++++++
 .../0.8.1.2.2/configuration/kafka-broker.xml    |  2 +-
 .../KAFKA/0.8.1.2.2/package/scripts/kafka.py    |  5 +-
 .../KAFKA/0.8.1.2.2/package/scripts/params.py   | 10 ----
 .../server/upgrade/UpgradeCatalog213Test.java   | 56 +++++++++++++++++++-
 5 files changed, 95 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/08c431d4/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
index b60404e..4217cdd 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
@@ -65,6 +65,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 /**
@@ -73,6 +74,7 @@ import java.util.UUID;
 public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
 
   private static final String STORM_SITE = "storm-site";
+  private static final String KAFKA_BROKER = "kafka-broker";
   private static final String AMS_ENV = "ams-env";
   private static final String AMS_HBASE_ENV = "ams-hbase-env";
   private static final String HBASE_ENV_CONFIG = "hbase-env";
@@ -156,6 +158,7 @@ public class UpgradeCatalog213 extends 
AbstractUpgradeCatalog {
     updateAMSConfigs();
     updateHbaseEnvConfig();
     updateAlertDefinitions();
+    updateKafkaConfigs();
   }
 
   /**
@@ -643,6 +646,41 @@ public class UpgradeCatalog213 extends 
AbstractUpgradeCatalog {
 
   }
 
+  protected void updateKafkaConfigs() throws AmbariException {
+    AmbariManagementController ambariManagementController = 
injector.getInstance(AmbariManagementController.class);
+    Clusters clusters = ambariManagementController.getClusters();
+
+    if (clusters != null) {
+      Map<String, Cluster> clusterMap = clusters.getClusters();
+      if (clusterMap != null && !clusterMap.isEmpty()) {
+        for (final Cluster cluster : clusterMap.values()) {
+          Set<String> installedServices =cluster.getServices().keySet();
+          Config kafkaBroker = cluster.getDesiredConfigByType(KAFKA_BROKER);
+          if (kafkaBroker != null) {
+            Map<String, String> newProperties = new HashMap<>();
+            Map<String, String> kafkaBrokerProperties = 
kafkaBroker.getProperties();
+            String kafkaMetricsReporters = 
kafkaBrokerProperties.get("kafka.metrics.reporters");
+            if (kafkaMetricsReporters == null ||
+              "{{kafka_metrics_reporters}}".equals(kafkaMetricsReporters)) {
+
+              if (installedServices.contains("AMBARI_METRICS")) {
+                newProperties.put("kafka.metrics.reporters", 
"org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter");
+              } else if (installedServices.contains("GANGLIA")) {
+                newProperties.put("kafka.metrics.reporters", 
"kafka.ganglia.KafkaGangliaMetricsReporter");
+              } else {
+                newProperties.put("kafka.metrics.reporters", " ");
+              }
+
+            }
+            if (!newProperties.isEmpty()) {
+              updateConfigurationPropertiesForCluster(cluster, KAFKA_BROKER, 
newProperties, true, true);
+            }
+          }
+        }
+      }
+    }
+  }
+
   protected String updateAmsEnvContent(String oldContent) {
     if (oldContent == null) {
       return null;

http://git-wip-us.apache.org/repos/asf/ambari/blob/08c431d4/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml
index 1cbfade..6a98648 100644
--- 
a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml
@@ -271,7 +271,7 @@
   </property>
   <property>
     <name>kafka.metrics.reporters</name>
-    <value>{{kafka_metrics_reporters}}</value>
+    
<value>org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter</value>
     <description>
       kafka ganglia metrics reporter and kafka timeline metrics reporter
     </description>

http://git-wip-us.apache.org/repos/asf/ambari/blob/08c431d4/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
index 4d28c41..7f9b4ed 100644
--- 
a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
@@ -21,8 +21,7 @@ limitations under the License.
 from resource_management import *
 from resource_management.libraries.resources.properties_file import 
PropertiesFile
 from resource_management.libraries.resources.template_config import 
TemplateConfig
-import sys, os
-from copy import deepcopy
+import os
 
 def kafka():
     import params
@@ -53,8 +52,6 @@ def kafka():
     else:
         kafka_server_config['host.name'] = params.hostname
 
-
-    kafka_server_config['kafka.metrics.reporters'] = 
params.kafka_metrics_reporters
     if(params.has_metric_collector):
             kafka_server_config['kafka.timeline.metrics.host'] = 
params.metric_collector_host
             kafka_server_config['kafka.timeline.metrics.port'] = 
params.metric_collector_port

http://git-wip-us.apache.org/repos/asf/ambari/blob/08c431d4/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
index 0a55504..dc0c087 100644
--- 
a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
+++ 
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
@@ -85,13 +85,9 @@ if 'ganglia_server_host' in config['clusterHostInfo'] and \
 else:
   ganglia_installed = False
 
-kafka_metrics_reporters=""
 metric_collector_host = ""
 metric_collector_port = ""
 
-if ganglia_installed:
-  kafka_metrics_reporters = "kafka.ganglia.KafkaGangliaMetricsReporter"
-
 ams_collector_hosts = default("/clusterHostInfo/metrics_collector_hosts", [])
 has_metric_collector = not len(ams_collector_hosts) == 0
 
@@ -101,12 +97,6 @@ if has_metric_collector:
   if metric_collector_port and metric_collector_port.find(':') != -1:
     metric_collector_port = metric_collector_port.split(':')[1]
 
-  if not len(kafka_metrics_reporters) == 0:
-      kafka_metrics_reporters = kafka_metrics_reporters + ','
-
-  kafka_metrics_reporters = kafka_metrics_reporters + 
"org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter"
-
-
 # Security-related params
 security_enabled = config['configurations']['cluster-env']['security_enabled']
 kafka_kerberos_enabled = ('security.inter.broker.protocol' in 
config['configurations']['kafka-broker'] and

http://git-wip-us.apache.org/repos/asf/ambari/blob/08c431d4/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
index d04e4c9..d3c615e 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
@@ -48,6 +48,7 @@ import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
 import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.RepositoryVersionState;
 import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.StackInfo;
@@ -172,6 +173,7 @@ public class UpgradeCatalog213Test {
     Method bootstrapRepoVersionForHDP21 = 
UpgradeCatalog213.class.getDeclaredMethod("bootstrapRepoVersionForHDP21");
     Method updateStormConfigs = 
UpgradeCatalog213.class.getDeclaredMethod("updateStormConfigs");
     Method updateAMSConfigs = 
UpgradeCatalog213.class.getDeclaredMethod("updateAMSConfigs");
+    Method updateKafkaConfigs = 
UpgradeCatalog213.class.getDeclaredMethod("updateKafkaConfigs");
     Method updateHbaseEnvConfig = 
UpgradeCatalog213.class.getDeclaredMethod("updateHbaseEnvConfig");
     Method addNewConfigurationsFromXml = 
AbstractUpgradeCatalog.class.getDeclaredMethod("addNewConfigurationsFromXml");
 
@@ -184,6 +186,7 @@ public class UpgradeCatalog213Test {
         .addMockedMethod(bootstrapRepoVersionForHDP21)
         .addMockedMethod(updateStormConfigs)
         .addMockedMethod(updateHbaseEnvConfig)
+        .addMockedMethod(updateKafkaConfigs)
         .createMock();
 
     upgradeCatalog213.addNewConfigurationsFromXml();
@@ -200,6 +203,8 @@ public class UpgradeCatalog213Test {
     expectLastCall().once();
     upgradeCatalog213.updateAlertDefinitions();
     expectLastCall().once();
+    upgradeCatalog213.updateKafkaConfigs();
+    expectLastCall().once();
 
     replay(upgradeCatalog213);
 
@@ -503,6 +508,55 @@ public class UpgradeCatalog213Test {
     Assert.assertEquals(expected, 
upgradeCatalog213.modifyJournalnodeProcessAlertSource(alertSource));
   }
 
+  @Test
+  public void testUpdateKafkaConfigs() throws Exception {
+    EasyMockSupport easyMockSupport = new EasyMockSupport();
+    final AmbariManagementController mockAmbariManagementController = 
easyMockSupport.createNiceMock(AmbariManagementController.class);
+    final ConfigHelper mockConfigHelper = 
easyMockSupport.createMock(ConfigHelper.class);
+
+    final Clusters mockClusters = 
easyMockSupport.createStrictMock(Clusters.class);
+    final Cluster mockClusterExpected = 
easyMockSupport.createNiceMock(Cluster.class);
+    final Map<String, String> propertiesAmsEnv = new HashMap<String, String>() 
{
+      {
+        put("kafka.metrics.reporters", "{{kafka_metrics_reporters}}");
+      }
+    };
+    final Map<String, Service> installedServices = new HashMap<String, 
Service>() {
+      {
+        put("KAFKA", null);
+        put("AMBARI_METRICS", null);
+      }
+    };
+
+    final Config mockAmsEnv = easyMockSupport.createNiceMock(Config.class);
+
+    final Injector mockInjector = Guice.createInjector(new AbstractModule() {
+      @Override
+      protected void configure() {
+        
bind(AmbariManagementController.class).toInstance(mockAmbariManagementController);
+        bind(ConfigHelper.class).toInstance(mockConfigHelper);
+        bind(Clusters.class).toInstance(mockClusters);
+        bind(EntityManager.class).toInstance(entityManager);
+
+        bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
+        bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
+      }
+    });
+
+    
expect(mockAmbariManagementController.getClusters()).andReturn(mockClusters).once();
+    expect(mockClusters.getClusters()).andReturn(new HashMap<String, 
Cluster>() {{
+      put("normal", mockClusterExpected);
+    }}).once();
+
+    
expect(mockClusterExpected.getServices()).andReturn(installedServices).atLeastOnce();
+    
expect(mockClusterExpected.getDesiredConfigByType("kafka-broker")).andReturn(mockAmsEnv).atLeastOnce();
+    
expect(mockAmsEnv.getProperties()).andReturn(propertiesAmsEnv).atLeastOnce();
+
+    easyMockSupport.replayAll();
+    mockInjector.getInstance(UpgradeCatalog213.class).updateKafkaConfigs();
+    easyMockSupport.verifyAll();
+  }
+
   /**
    * @param dbAccessor
    * @return
@@ -582,4 +636,4 @@ public class UpgradeCatalog213Test {
       Assert.assertEquals("upgrade_type", upgradeTypeCol.getName());
     }
   }
-}
\ No newline at end of file
+}

Reply via email to