Repository: ambari Updated Branches: refs/heads/branch-2.4 5d4ea49b9 -> f7fe36477
AMBARI-16027. Kafka upgrade from HDP 2.2 to HDP 2.3 is breaking (Sriharsha Chintalapani via alejandro) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f7fe3647 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f7fe3647 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f7fe3647 Branch: refs/heads/branch-2.4 Commit: f7fe36477fd1fe4bc67eed9c61b13963a8d757a4 Parents: 5d4ea49 Author: Alejandro Fernandez <[email protected]> Authored: Wed Jul 20 12:14:28 2016 -0700 Committer: Alejandro Fernandez <[email protected]> Committed: Wed Jul 20 12:14:28 2016 -0700 ---------------------------------------------------------------------- .../KAFKA/0.8.1/package/scripts/kafka.py | 63 +++++++++++++------- .../KAFKA/0.9.0/configuration/kafka-broker.xml | 7 --- .../stacks/HDP/2.2/upgrades/config-upgrade.xml | 18 ------ .../stacks/HDP/2.2/upgrades/upgrade-2.3.xml | 6 +- .../stacks/HDP/2.2/upgrades/upgrade-2.4.xml | 6 +- .../stacks/HDP/2.3/upgrades/upgrade-2.4.xml | 4 +- 6 files changed, 51 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/f7fe3647/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py index 33275f9..ac7b0ae 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py @@ -28,6 +28,7 @@ from resource_management.core.source import StaticFile, Template, InlineTemplate from resource_management.libraries.functions import format from resource_management.libraries.functions.stack_features import check_stack_feature from resource_management.libraries.functions import StackFeature +from resource_management.libraries.functions import Direction from resource_management.core.logger import Logger @@ -44,34 +45,56 @@ def kafka(upgrade_type=None): effective_version = params.stack_version_formatted if upgrade_type is None else format_stack_version(params.version) Logger.info(format("Effective stack version: {effective_version}")) + # In HDP-2.2 (Apache Kafka 0.8.1.1) we used to generate broker.ids based on hosts and add them to + # kafka's server.properties. In future version brokers can generate their own ids based on zookeeper seq + # We need to preserve the broker.id when user is upgrading from HDP-2.2 to any higher version. + # Once its preserved it will be written to kafka.log.dirs/meta.properties and it will be used from there on + # similarly we need preserve port as well during the upgrade + + if upgrade_type is not None and params.upgrade_direction == Direction.UPGRADE and \ + check_stack_feature(StackFeature.CREATE_KAFKA_BROKER_ID, params.current_version) and \ + check_stack_feature(StackFeature.KAFKA_LISTENERS, params.version): + if len(params.kafka_hosts) > 0 and params.hostname in params.kafka_hosts: + brokerid = str(sorted(params.kafka_hosts).index(params.hostname)) + kafka_server_config['broker.id'] = brokerid + Logger.info(format("Calculating broker.id as {brokerid}")) + if 'port' in kafka_server_config: + port = kafka_server_config['port'] + Logger.info(format("Port config from previous verson: {port}")) + listeners = kafka_server_config['listeners'] + kafka_server_config['listeners'] = listeners.replace("6667", port) + Logger.info(format("Kafka listeners after the port update: {listeners}")) + del kafka_server_config['port'] + + if effective_version is not None and effective_version != "" and \ check_stack_feature(StackFeature.CREATE_KAFKA_BROKER_ID, effective_version): if len(params.kafka_hosts) > 0 and params.hostname in params.kafka_hosts: brokerid = str(sorted(params.kafka_hosts).index(params.hostname)) kafka_server_config['broker.id'] = brokerid Logger.info(format("Calculating broker.id as {brokerid}")) - + # listeners and advertised.listeners are only added in 2.3.0.0 onwards. if effective_version is not None and effective_version != "" and \ - check_stack_feature(StackFeature.KAFKA_LISTENERS, effective_version): - listeners = kafka_server_config['listeners'].replace("localhost", params.hostname) - Logger.info(format("Kafka listeners: {listeners}")) - - if params.security_enabled and params.kafka_kerberos_enabled: - Logger.info("Kafka kerberos security is enabled.") - if "SASL" not in listeners: - listeners = listeners.replace("PLAINTEXT", "PLAINTEXTSASL") - - kafka_server_config['listeners'] = listeners - kafka_server_config['advertised.listeners'] = listeners - Logger.info(format("Kafka advertised listeners: {listeners}")) - else: - kafka_server_config['listeners'] = listeners - - if 'advertised.listeners' in kafka_server_config: - advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname) - kafka_server_config['advertised.listeners'] = advertised_listeners - Logger.info(format("Kafka advertised listeners: {advertised_listeners}")) + check_stack_feature(StackFeature.KAFKA_LISTENERS, effective_version): + + listeners = kafka_server_config['listeners'].replace("localhost", params.hostname) + Logger.info(format("Kafka listeners: {listeners}")) + + if params.security_enabled and params.kafka_kerberos_enabled: + Logger.info("Kafka kerberos security is enabled.") + if "SASL" not in listeners: + listeners = listeners.replace("PLAINTEXT", "PLAINTEXTSASL") + + kafka_server_config['listeners'] = listeners + kafka_server_config['advertised.listeners'] = listeners + Logger.info(format("Kafka advertised listeners: {listeners}")) + else: + kafka_server_config['listeners'] = listeners + if 'advertised.listeners' in kafka_server_config: + advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname) + kafka_server_config['advertised.listeners'] = advertised_listeners + Logger.info(format("Kafka advertised listeners: {advertised_listeners}")) else: kafka_server_config['host.name'] = params.hostname http://git-wip-us.apache.org/repos/asf/ambari/blob/f7fe3647/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/kafka-broker.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/kafka-broker.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/kafka-broker.xml index 3b2ab97..0275358 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/kafka-broker.xml +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/configuration/kafka-broker.xml @@ -137,13 +137,6 @@ <on-ambari-upgrade add="true"/> </property> <property> - <name>port</name> - <value>6667</value> - <description>Deprecated config in favor of listeners config.</description> - <deleted>true</deleted> - <on-ambari-upgrade add="false"/> - </property> - <property> <name>external.kafka.metrics.exclude.prefix</name> <value>kafka.network.RequestMetrics,kafka.server.DelayedOperationPurgatory,kafka.server.BrokerTopicMetrics.BytesRejectedPerSec</value> <description> http://git-wip-us.apache.org/repos/asf/ambari/blob/f7fe3647/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/config-upgrade.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/config-upgrade.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/config-upgrade.xml index f7405c1..95310d5 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/config-upgrade.xml +++ b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/config-upgrade.xml @@ -1073,24 +1073,6 @@ </component> </service> - <service name="KAFKA"> - <component name="KAFKA_BROKER"> - <changes> - <definition xsi:type="configure" id="hdp_2_3_0_0_kafka_broker_deprecate_port"> - <type>kafka-broker</type> - <!-- Deprecate "port" property since "listeners" will be added. --> - <transfer operation="delete" delete-key="port"/> - </definition> - - <definition xsi:type="configure" id="hdp_2_4_0_0_kafka_broker_deprecate_port"> - <type>kafka-broker</type> - <!-- Deprecate "port" property since "listeners" will be added. --> - <transfer operation="delete" delete-key="port"/> - </definition> - </changes> - </component> - </service> - <service name="KNOX"> <component name="KNOX_GATEWAY"> <changes> http://git-wip-us.apache.org/repos/asf/ambari/blob/f7fe3647/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml index a406758..da2d891 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml +++ b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml @@ -846,13 +846,13 @@ <script>scripts/kafka_broker.py</script> <function>stop</function> </task> - - <task xsi:type="configure" id="hdp_2_3_0_0_kafka_broker_deprecate_port"/> </pre-upgrade> - <upgrade> <task xsi:type="restart-task"/> </upgrade> + <post-upgrade> + <task xsi:type="configure" id="hdp_2_3_0_0_kafka_broker_deprecate_port"/> + </post-upgrade> </component> </service> http://git-wip-us.apache.org/repos/asf/ambari/blob/f7fe3647/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.4.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.4.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.4.xml index 72a3018..bda0995 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.4.xml +++ b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.4.xml @@ -859,13 +859,13 @@ <script>scripts/kafka_broker.py</script> <function>stop</function> </task> - - <task xsi:type="configure" id="hdp_2_4_0_0_kafka_broker_deprecate_port"/> </pre-upgrade> - <upgrade> <task xsi:type="restart-task"/> </upgrade> + <post-upgrade> + <task xsi:type="configure" id="hdp_2_4_0_0_kafka_broker_deprecate_port"/> + </post-upgrade> </component> </service> http://git-wip-us.apache.org/repos/asf/ambari/blob/f7fe3647/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.4.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.4.xml b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.4.xml index 27e7e92..f16a9da 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.4.xml +++ b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.4.xml @@ -481,9 +481,9 @@ <service name="KAFKA"> <component name="KAFKA_BROKER"> - <pre-upgrade> + <post-upgrade> <task xsi:type="configure" id ="hdp_2_4_0_0_kafka_broker_deprecate_port"/> - </pre-upgrade> + </post-upgrade> <!-- no-op to prevent config changes on downgrade --> <pre-downgrade/>
