AMBARI-18517 : Changes in upgrade path for Kafka metrics collector hosts config. (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8431557b Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8431557b Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8431557b Branch: refs/heads/branch-2.5 Commit: 8431557b21e46c470b49e3071865d27d56e6cc98 Parents: abd19c6 Author: Aravindan Vijayan <[email protected]> Authored: Mon Nov 14 21:47:43 2016 -0800 Committer: Aravindan Vijayan <[email protected]> Committed: Tue Nov 15 11:06:40 2016 -0800 ---------------------------------------------------------------------- .../server/upgrade/UpgradeCatalog250.java | 51 +++++- .../server/upgrade/UpgradeCatalog250Test.java | 178 +++++++++++++++++-- 2 files changed, 213 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/8431557b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java index ebb0007..196a10f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java @@ -19,6 +19,7 @@ package org.apache.ambari.server.upgrade; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -133,6 +134,8 @@ public class UpgradeCatalog250 extends AbstractUpgradeCatalog { protected void executeDMLUpdates() throws AmbariException, SQLException { addNewConfigurationsFromXml(); updateAMSConfigs(); + createRoleAuthorizations(); + updateKafkaConfigs(); } protected void updateHostVersionTable() throws SQLException { @@ -201,7 +204,23 @@ public class UpgradeCatalog250 extends AbstractUpgradeCatalog { } /** + * Create new role authorizations: CLUSTER.RUN_CUSTOM_COMMAND and AMBARI.RUN_CUSTOM_COMMAND + * + * @throws SQLException + */ + protected void createRoleAuthorizations() throws SQLException { + LOG.info("Adding authorizations"); + + addRoleAuthorization("CLUSTER.RUN_CUSTOM_COMMAND", "Perform custom cluster-level actions", + Arrays.asList("AMBARI.ADMINISTRATOR:AMBARI", "CLUSTER.ADMINISTRATOR:CLUSTER")); + + addRoleAuthorization("AMBARI.RUN_CUSTOM_COMMAND", "Perform custom administrative actions", + Collections.singletonList("AMBARI.ADMINISTRATOR:AMBARI")); + } + + /** * Creates the servicecomponent_version table + * * @throws SQLException */ private void createComponentVersionTable() throws SQLException { @@ -221,14 +240,38 @@ public class UpgradeCatalog250 extends AbstractUpgradeCatalog { dbAccessor.addPKConstraint(COMPONENT_VERSION_TABLE, COMPONENT_VERSION_PK, "id"); dbAccessor.addFKConstraint(COMPONENT_VERSION_TABLE, COMPONENT_VERSION_FK_COMPONENT, "component_id", - COMPONENT_TABLE, "id", false); + COMPONENT_TABLE, "id", false); dbAccessor.addFKConstraint(COMPONENT_VERSION_TABLE, COMPONENT_VERSION_FK_REPO_VERSION, "repo_version_id", - "repo_version", "repo_version_id", false); + "repo_version", "repo_version_id", false); addSequence("servicecomponent_version_id_seq", 0L, false); } + protected void updateKafkaConfigs() throws AmbariException { + AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class); + Clusters clusters = ambariManagementController.getClusters(); + + if (clusters != null) { + Map<String, Cluster> clusterMap = clusters.getClusters(); + + if (clusterMap != null && !clusterMap.isEmpty()) { + for (final Cluster cluster : clusterMap.values()) { + + Config kafkaBrokerConfig = cluster.getDesiredConfigByType(KAFKA_BROKER); + if (kafkaBrokerConfig != null) { + Map<String, String> kafkaBrokerProperties = kafkaBrokerConfig.getProperties(); + + if (kafkaBrokerProperties != null && kafkaBrokerProperties.containsKey(KAFKA_TIMELINE_METRICS_HOST)) { + LOG.info("Removing kafka.timeline.metrics.host from kafka-broker"); + removeConfigurationPropertiesFromCluster(cluster, KAFKA_BROKER, Collections.singleton("kafka.timeline.metrics.host")); + } + } + } + } + } + } + /** * Alter servicedesiredstate table. * @throws SQLException @@ -238,10 +281,10 @@ public class UpgradeCatalog250 extends AbstractUpgradeCatalog { // credential_store_supported SMALLINT DEFAULT 0 NOT NULL // credential_store_enabled SMALLINT DEFAULT 0 NOT NULL dbAccessor.addColumn(SERVICE_DESIRED_STATE_TABLE, - new DBColumnInfo(CREDENTIAL_STORE_SUPPORTED_COL, Short.class, null, 0, false)); + new DBColumnInfo(CREDENTIAL_STORE_SUPPORTED_COL, Short.class, null, 0, false)); dbAccessor.addColumn(SERVICE_DESIRED_STATE_TABLE, - new DBColumnInfo(CREDENTIAL_STORE_ENABLED_COL, Short.class, null, 0, false)); + new DBColumnInfo(CREDENTIAL_STORE_ENABLED_COL, Short.class, null, 0, false)); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/8431557b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java index 33bba0c..2f562b8 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java @@ -38,11 +38,15 @@ import static org.junit.Assert.assertTrue; import java.lang.reflect.Method; import java.sql.Connection; import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.actionmanager.ActionManager; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.AmbariManagementController; @@ -50,6 +54,12 @@ import org.apache.ambari.server.controller.AmbariManagementControllerImpl; import org.apache.ambari.server.controller.KerberosHelper; import org.apache.ambari.server.controller.MaintenanceStateHelper; import org.apache.ambari.server.orm.DBAccessor; +import org.apache.ambari.server.orm.dao.PermissionDAO; +import org.apache.ambari.server.orm.dao.ResourceTypeDAO; +import org.apache.ambari.server.orm.dao.RoleAuthorizationDAO; +import org.apache.ambari.server.orm.entities.PermissionEntity; +import org.apache.ambari.server.orm.entities.ResourceTypeEntity; +import org.apache.ambari.server.orm.entities.RoleAuthorizationEntity; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Config; @@ -74,7 +84,7 @@ import com.google.inject.Provider; */ public class UpgradeCatalog250Test { -// private Injector injector; + // private Injector injector; private Provider<EntityManager> entityManagerProvider = createStrictMock(Provider.class); private EntityManager entityManager = createNiceMock(EntityManager.class); @@ -112,16 +122,16 @@ public class UpgradeCatalog250Test { Capture<List<DBAccessor.DBColumnInfo>> capturedComponentVersionColumns = newCapture(); dbAccessor.createTable(eq(UpgradeCatalog250.COMPONENT_VERSION_TABLE), capture(capturedComponentVersionColumns), - eq((String[]) null)); + eq((String[]) null)); dbAccessor.addPKConstraint(eq(UpgradeCatalog250.COMPONENT_VERSION_TABLE), - eq(UpgradeCatalog250.COMPONENT_VERSION_PK), eq("id")); + eq(UpgradeCatalog250.COMPONENT_VERSION_PK), eq("id")); dbAccessor.addFKConstraint(eq(UpgradeCatalog250.COMPONENT_VERSION_TABLE), - eq(UpgradeCatalog250.COMPONENT_VERSION_FK_COMPONENT), eq("component_id"), - eq(UpgradeCatalog250.COMPONENT_TABLE), eq("id"), eq(false)); + eq(UpgradeCatalog250.COMPONENT_VERSION_FK_COMPONENT), eq("component_id"), + eq(UpgradeCatalog250.COMPONENT_TABLE), eq("id"), eq(false)); dbAccessor.addFKConstraint(eq(UpgradeCatalog250.COMPONENT_VERSION_TABLE), - eq(UpgradeCatalog250.COMPONENT_VERSION_FK_REPO_VERSION), eq("repo_version_id"), - eq("repo_version"), eq("repo_version_id"), eq(false)); + eq(UpgradeCatalog250.COMPONENT_VERSION_FK_REPO_VERSION), eq("repo_version_id"), + eq("repo_version"), eq("repo_version_id"), eq(false)); // servicedesiredstate table Capture<DBAccessor.DBColumnInfo> capturedCredentialStoreSupportedCol = newCapture(); @@ -210,17 +220,27 @@ public class UpgradeCatalog250Test { @Test public void testExecuteDMLUpdates() throws Exception { Method updateAmsConfigs = UpgradeCatalog250.class.getDeclaredMethod("updateAMSConfigs"); + Method createRoleAuthorizations = UpgradeCatalog250.class.getDeclaredMethod("createRoleAuthorizations"); + Method updateKafkaConfigs = UpgradeCatalog250.class.getDeclaredMethod("updateKafkaConfigs"); Method addNewConfigurationsFromXml = AbstractUpgradeCatalog.class.getDeclaredMethod("addNewConfigurationsFromXml"); UpgradeCatalog250 upgradeCatalog250 = createMockBuilder(UpgradeCatalog250.class) - .addMockedMethod(updateAmsConfigs) - .addMockedMethod(addNewConfigurationsFromXml) - .createMock(); + .addMockedMethod(updateAmsConfigs) + .addMockedMethod(createRoleAuthorizations) + .addMockedMethod(updateKafkaConfigs) + .addMockedMethod(addNewConfigurationsFromXml) + .createMock(); + upgradeCatalog250.updateAMSConfigs(); + expectLastCall().once(); + upgradeCatalog250.addNewConfigurationsFromXml(); expectLastCall().once(); + upgradeCatalog250.updateKafkaConfigs(); + expectLastCall().once(); + replay(upgradeCatalog250); upgradeCatalog250.executeDMLUpdates(); @@ -229,7 +249,7 @@ public class UpgradeCatalog250Test { } @Test - public void testAmsEnvUpdateConfigs() throws Exception{ + public void testAmsEnvUpdateConfigs() throws Exception { Map<String, String> oldPropertiesAmsEnv = new HashMap<String, String>() { { @@ -278,7 +298,7 @@ public class UpgradeCatalog250Test { AmbariManagementControllerImpl controller = createMockBuilder(AmbariManagementControllerImpl.class) .addMockedMethod("createConfiguration") - .addMockedMethod("getClusters", new Class[] { }) + .addMockedMethod("getClusters", new Class[]{}) .addMockedMethod("createConfig") .withConstructor(createNiceMock(ActionManager.class), clusters, injector) .createNiceMock(); @@ -298,4 +318,138 @@ public class UpgradeCatalog250Test { Map<String, String> updatedProperties = propertiesCapture.getValue(); assertTrue(Maps.difference(newPropertiesAmsEnv, updatedProperties).areEqual()); } + + @Test + public void testCreateRoleAuthorizations() throws AmbariException, SQLException { + + EasyMockSupport easyMockSupport = new EasyMockSupport(); + + ResourceTypeEntity ambariResourceTypeEntity = easyMockSupport.createMock(ResourceTypeEntity.class); + + ResourceTypeEntity clusterResourceTypeEntity = easyMockSupport.createMock(ResourceTypeEntity.class); + + Collection<RoleAuthorizationEntity> ambariAdministratorAuthorizations = new ArrayList<RoleAuthorizationEntity>(); + Collection<RoleAuthorizationEntity> clusterAdministratorAuthorizations = new ArrayList<RoleAuthorizationEntity>(); + + PermissionEntity clusterAdministratorPermissionEntity = easyMockSupport.createMock(PermissionEntity.class); + expect(clusterAdministratorPermissionEntity.getAuthorizations()) + .andReturn(clusterAdministratorAuthorizations) + .times(1); + + PermissionEntity ambariAdministratorPermissionEntity = easyMockSupport.createMock(PermissionEntity.class); + expect(ambariAdministratorPermissionEntity.getAuthorizations()) + .andReturn(ambariAdministratorAuthorizations) + .times(2); + + PermissionDAO permissionDAO = easyMockSupport.createMock(PermissionDAO.class); + expect(permissionDAO.findPermissionByNameAndType("AMBARI.ADMINISTRATOR", ambariResourceTypeEntity)) + .andReturn(ambariAdministratorPermissionEntity) + .times(2); + expect(permissionDAO.findPermissionByNameAndType("CLUSTER.ADMINISTRATOR", clusterResourceTypeEntity)) + .andReturn(clusterAdministratorPermissionEntity) + .times(1); + expect(permissionDAO.merge(ambariAdministratorPermissionEntity)) + .andReturn(ambariAdministratorPermissionEntity) + .times(2); + expect(permissionDAO.merge(clusterAdministratorPermissionEntity)) + .andReturn(clusterAdministratorPermissionEntity) + .times(1); + + ResourceTypeDAO resourceTypeDAO = easyMockSupport.createMock(ResourceTypeDAO.class); + expect(resourceTypeDAO.findByName("AMBARI")).andReturn(ambariResourceTypeEntity).times(2); + expect(resourceTypeDAO.findByName("CLUSTER")).andReturn(clusterResourceTypeEntity).times(1); + + RoleAuthorizationDAO roleAuthorizationDAO = easyMockSupport.createMock(RoleAuthorizationDAO.class); + expect(roleAuthorizationDAO.findById("CLUSTER.RUN_CUSTOM_COMMAND")).andReturn(null).times(1); + expect(roleAuthorizationDAO.findById("AMBARI.RUN_CUSTOM_COMMAND")).andReturn(null).times(1); + + Capture<RoleAuthorizationEntity> captureClusterRunCustomCommandEntity = newCapture(); + roleAuthorizationDAO.create(capture(captureClusterRunCustomCommandEntity)); + expectLastCall().times(1); + + Capture<RoleAuthorizationEntity> captureAmbariRunCustomCommandEntity = newCapture(); + roleAuthorizationDAO.create(capture(captureAmbariRunCustomCommandEntity)); + expectLastCall().times(1); + + Injector injector = easyMockSupport.createNiceMock(Injector.class); + expect(injector.getInstance(RoleAuthorizationDAO.class)).andReturn(roleAuthorizationDAO).atLeastOnce(); + expect(injector.getInstance(PermissionDAO.class)).andReturn(permissionDAO).atLeastOnce(); + expect(injector.getInstance(ResourceTypeDAO.class)).andReturn(resourceTypeDAO).atLeastOnce(); + + easyMockSupport.replayAll(); + new UpgradeCatalog250(injector).createRoleAuthorizations(); + easyMockSupport.verifyAll(); + + RoleAuthorizationEntity ambariRunCustomCommandEntity = captureAmbariRunCustomCommandEntity.getValue(); + RoleAuthorizationEntity clusterRunCustomCommandEntity = captureClusterRunCustomCommandEntity.getValue(); + + Assert.assertEquals("AMBARI.RUN_CUSTOM_COMMAND", ambariRunCustomCommandEntity.getAuthorizationId()); + Assert.assertEquals("Perform custom administrative actions", ambariRunCustomCommandEntity.getAuthorizationName()); + + Assert.assertEquals("CLUSTER.RUN_CUSTOM_COMMAND", clusterRunCustomCommandEntity.getAuthorizationId()); + Assert.assertEquals("Perform custom cluster-level actions", clusterRunCustomCommandEntity.getAuthorizationName()); + + Assert.assertEquals(2, ambariAdministratorAuthorizations.size()); + Assert.assertTrue(ambariAdministratorAuthorizations.contains(clusterRunCustomCommandEntity)); + Assert.assertTrue(ambariAdministratorAuthorizations.contains(ambariRunCustomCommandEntity)); + + Assert.assertEquals(1, clusterAdministratorAuthorizations.size()); + Assert.assertTrue(clusterAdministratorAuthorizations.contains(clusterRunCustomCommandEntity)); + } + + @Test + public void testKafkaUpdateConfigs() throws Exception { + + Map<String, String> oldProperties = new HashMap<String, String>() { + { + put("kafka.timeline.metrics.host", "{{metric_collector_host}}"); + put("kafka.timeline.metrics.port", "{{metric_collector_port}}"); + } + }; + Map<String, String> newProperties = new HashMap<String, String>() { + { + put("kafka.timeline.metrics.port", "{{metric_collector_port}}"); + } + }; + EasyMockSupport easyMockSupport = new EasyMockSupport(); + + Clusters clusters = easyMockSupport.createNiceMock(Clusters.class); + final Cluster cluster = easyMockSupport.createNiceMock(Cluster.class); + Config mockKafkaBroker = easyMockSupport.createNiceMock(Config.class); + + expect(clusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{ + put("normal", cluster); + }}).once(); + expect(cluster.getDesiredConfigByType("kafka-broker")).andReturn(mockKafkaBroker).atLeastOnce(); + expect(mockKafkaBroker.getProperties()).andReturn(oldProperties).anyTimes(); + + Injector injector = easyMockSupport.createNiceMock(Injector.class); + expect(injector.getInstance(Gson.class)).andReturn(null).anyTimes(); + expect(injector.getInstance(MaintenanceStateHelper.class)).andReturn(null).anyTimes(); + expect(injector.getInstance(KerberosHelper.class)).andReturn(createNiceMock(KerberosHelper.class)).anyTimes(); + + replay(injector, clusters, mockKafkaBroker, cluster); + + AmbariManagementControllerImpl controller = createMockBuilder(AmbariManagementControllerImpl.class) + .addMockedMethod("createConfiguration") + .addMockedMethod("getClusters", new Class[]{}) + .addMockedMethod("createConfig") + .withConstructor(createNiceMock(ActionManager.class), clusters, injector) + .createNiceMock(); + + Injector injector2 = easyMockSupport.createNiceMock(Injector.class); + Capture<Map> propertiesCapture = EasyMock.newCapture(); + + expect(injector2.getInstance(AmbariManagementController.class)).andReturn(controller).anyTimes(); + expect(controller.getClusters()).andReturn(clusters).anyTimes(); + expect(controller.createConfig(anyObject(Cluster.class), anyString(), capture(propertiesCapture), anyString(), + anyObject(Map.class))).andReturn(createNiceMock(Config.class)).once(); + + replay(controller, injector2); + new UpgradeCatalog250(injector2).updateKafkaConfigs(); + easyMockSupport.verifyAll(); + + Map<String, String> updatedProperties = propertiesCapture.getValue(); + assertTrue(Maps.difference(newProperties, updatedProperties).areEqual()); + } }
