This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch 2.10.4/pass_subName in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a28ee0e3b34163594b9b946170f371f162d77102 Author: Qiang Huang <[email protected]> AuthorDate: Fri Aug 19 13:57:52 2022 +0800 [fix][broker]optimize the shutdown sequence of broker service when it close (#16756) (cherry picked from commit 14912a6a8db750e972de577542b937aeac467987) --- deployment/terraform-ansible/deploy-pulsar.yaml-e | 214 +++++++++++++++++++++ .../org/apache/pulsar/broker/PulsarService.java | 9 +- 2 files changed, 219 insertions(+), 4 deletions(-) diff --git a/deployment/terraform-ansible/deploy-pulsar.yaml-e b/deployment/terraform-ansible/deploy-pulsar.yaml-e new file mode 100644 index 00000000000..ae1243ad66e --- /dev/null +++ b/deployment/terraform-ansible/deploy-pulsar.yaml-e @@ -0,0 +1,214 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +- name: Pulsar setup + hosts: zookeeper, bookie, broker, proxy + connection: ssh + become: true + tasks: + - name: Create necessary directories + file: + path: "{{ item }}" + state: directory + with_items: ["/opt/pulsar"] + - name: Install RPM packages + yum: + state: latest + name: + - wget + - java + - sysstat + - vim + - set_fact: + zookeeper_servers: "{{ groups['zookeeper']|map('extract', hostvars, ['ansible_default_ipv4', 'address'])|map('regex_replace', '^(.*)$', '\\1:2181') | join(',') }}" + service_url: "{{ pulsar_service_url }}" + http_url: "{{ pulsar_web_url }}" + pulsar_version: "2.8.1" + - name: Download Pulsar binary package + unarchive: + src: https://archive.apache.org/dist/pulsar/pulsar-{{ pulsar_version }}/apache-pulsar-{{ pulsar_version }}-bin.tar.gz + remote_src: yes + dest: /opt/pulsar + extra_opts: ["--strip-components=1"] + - set_fact: + max_heap_memory: "12g" + max_direct_memory: "12g" + cluster_name: "local" + - name: Add pulsar_env.sh configuration file + template: + src: "../templates/pulsar_env.sh" + dest: "/opt/pulsar/conf/pulsar_env.sh" + +- name: Set up ZooKeeper + hosts: zookeeper + connection: ssh + become: true + tasks: + - set_fact: + zid: "{{ groups['zookeeper'].index(inventory_hostname) }}" + max_heap_memory: "512m" + max_direct_memory: "1g" + - name: Create ZooKeeper data directory + file: + path: "/opt/pulsar/{{ item }}" + state: directory + with_items: + - data/zookeeper + - name: Add pulsar_env.sh configuration file + template: + src: "../templates/pulsar_env.sh" + dest: "/opt/pulsar/conf/pulsar_env.sh" + - name: Add zookeeper.conf file + template: + src: "../templates/zoo.cfg" + dest: "/opt/pulsar/conf/zookeeper.conf" + - name: Add myid file for ZooKeeper + template: + src: "../templates/myid" + dest: "/opt/pulsar/data/zookeeper/myid" + - name: Add zookeeper.service systemd file + template: + src: "../templates/zookeeper.service" + dest: "/etc/systemd/system/zookeeper.service" + - name: systemd ZooKeeper start + systemd: + state: restarted + daemon_reload: yes + name: "zookeeper" + - name: Initialize cluster metadata + shell: | + bin/pulsar initialize-cluster-metadata \ + --cluster {{ cluster_name }} \ + --zookeeper localhost:2181 \ + --configuration-store localhost:2181 \ + --web-service-url {{ http_url }} \ + --broker-service-url {{ service_url }} + args: + chdir: /opt/pulsar + when: groups['zookeeper'][0] == inventory_hostname + +- name: Set up bookies + hosts: bookie + connection: ssh + become: true + tasks: + - template: + src: "../templates/bookkeeper.conf" + dest: "/opt/pulsar/conf/bookkeeper.conf" + - template: + src: "../templates/bookkeeper.service" + dest: "/etc/systemd/system/bookkeeper.service" + - systemd: + state: restarted + daemon_reload: yes + name: "bookkeeper" + +- name: Set up brokers + hosts: broker + connection: ssh + become: true + tasks: + - name: Create connectors directory + file: + path: "/opt/pulsar/{{ item }}" + state: directory + loop: + - connectors + - name: Download Pulsar IO packages + get_url: + url: https://archive.apache.org/dist/pulsar/pulsar-{{ pulsar_version }}/connectors/pulsar-io-{{ item }}-{{ pulsar_version }}.nar + dest: /opt/pulsar/connectors/pulsar-io-{{ item }}-{{ pulsar_version }}.nar + loop: +# - aerospike +# - canal +# - cassandra +# - data-generator +# - debezium-mongodb +# - debezium-mysql +# - debezium-postgres +# - dynamodb +# - elastic-search +# - file +# - flume +# - hbase +# - hdfs2 +# - hdfs3 +# - influxdb +# - jdbc-clickhouse +# - jdbc-mariadb +# - jdbc-postgres +# - jdbc-sqlite + - kafka +# - kafka-connect-adaptor +# - kinesis +# - mongo +# - netty +# - rabbitmq +# - redis +# - solr +# - twitter + - name: Set up broker + template: + src: "../templates/broker.conf" + dest: "/opt/pulsar/conf/broker.conf" + - name: Set up function worker + template: + src: "../templates/functions_worker.yml" + dest: "/opt/pulsar/conf/functions_worker.yml" + - template: + src: "../templates/pulsar.broker.service" + dest: "/etc/systemd/system/pulsar.broker.service" + - systemd: + state: restarted + daemon_reload: yes + name: "pulsar.broker" + +- name: Set up Pulsar Proxy + hosts: proxy + connection: ssh + become: true + tasks: + - name: Set up proxy + template: + src: "../templates/proxy.conf" + dest: "/opt/pulsar/conf/proxy.conf" + - template: + src: "../templates/pulsar.proxy.service" + dest: "/etc/systemd/system/pulsar.proxy.service" + - systemd: + state: restarted + daemon_reload: yes + name: "pulsar.proxy" + +- name: Hosts addresses + hosts: localhost + become: false + tasks: + - debug: + msg: "Zookeeper Server {{ item }}" + with_items: "{{ groups['zookeeper'] }}" + - debug: + msg: "Bookie {{ item }}" + with_items: "{{ groups['bookie'] }}" + - debug: + msg: "Broker {{ item }}" + with_items: "{{ groups['broker'] }}" + - debug: + msg: "Proxy {{ item }}" + with_items: "{{ groups['proxy'] }}" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 490baaf6e53..5c239e73bf5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -446,6 +446,11 @@ public class PulsarService implements AutoCloseable, ShutdownService { (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * getConfiguration() .getBrokerShutdownTimeoutMs()))); + // close protocol handler before closing broker service + if (protocolHandlers != null) { + protocolHandlers.close(); + protocolHandlers = null; + } List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>(); if (this.brokerService != null) { @@ -524,10 +529,6 @@ public class PulsarService implements AutoCloseable, ShutdownService { offloadersCache.close(); - if (protocolHandlers != null) { - protocolHandlers.close(); - protocolHandlers = null; - } if (transactionBufferClient != null) { transactionBufferClient.close();
