This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ea2798  [pulsar-broker] add flag to skip broker shutdown on transient 
OOM (#6634)
2ea2798 is described below

commit 2ea2798219945e7898ca1145e9d1d45fe682980c
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Mar 30 14:59:12 2020 -0700

    [pulsar-broker] add flag to skip broker shutdown on transient OOM (#6634)
    
    ### Motivation
    
    Some time due to high dispatch rate on one of the topic can temporarily 
cause broker to go OOM and it will be transient error and broker can recover 
within a few seconds as soon as some memory gets released. However, 2.4 release 
has change #4196 which restarts broker on OOM which can cause huge instability 
in cluster where that topic moves from one broker to another and restarts 
multiple brokers and cause disruption for other topics as well. we have seen 
similar kind of issue mentione [...]
    ```
    01:48:49.549 [pulsar-io-22-37] ERROR org.apache.pulsar.PulsarBrokerStarter 
- -- Shutting down - Received OOM exception: Direct buffer memory
    java.lang.OutOfMemoryError: Direct buffer memory
            at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
            at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) 
~[?:?]
            at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
            at 
io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:769) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:745) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164)
 ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
            at 
org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158)
 ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
            at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:1912) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.handler.ssl.SslHandler.allocateOutNetBuf(SslHandler.java:1923) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:826) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:778) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
org.apache.pulsar.broker.service.Consumer.lambda$sendMessages$51(Consumer.java:265)
 ~[pulsar-broker-2.4.6-yahoo.jar:2.4.6-yahoo]
            at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
 [netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
 [netty-all-4.1.32.Final.jar:4.1.32.
    :
    :
    01:48:49.549 [pulsar-io-22-39] ERROR org.apache.pulsar.PulsarBrokerStarter 
- -- Shutting down - Received OOM exception: Direct buffer memory
    java.lang.OutOfMemoryError: Direct buffer memory
            at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
            at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) 
~[?:?]
            at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
            at 
io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:769) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:745) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164)
 [bookkeeper-common-allocator-4.9.4.2-ya
    hoo.jar:4.9.4.2-yahoo]
            at 
org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158)
 [bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
            at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
 [netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
 [netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53)
 [netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
 [netty-all-4.1.32.Final.jar:4.1.32.Final]
    ```
    
    ### Modification
    Add dynamic flag to avoid broker shutdown on OOM.
---
 conf/broker.conf                                                  | 3 +++
 conf/standalone.conf                                              | 3 +++
 deployment/terraform-ansible/templates/broker.conf                | 3 +++
 .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java  | 6 ++++++
 .../src/main/java/org/apache/pulsar/PulsarBrokerStarter.java      | 8 ++++++--
 site2/docs/reference-configuration.md                             | 2 ++
 6 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 1a96ff1..2ded719 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -67,6 +67,9 @@ zooKeeperOperationTimeoutSeconds=30
 # Time to wait for broker graceful shutdown. After this time elapses, the 
process will be killed
 brokerShutdownTimeoutMs=60000
 
+# Flag to skip broker shutdown when broker handles Out of memory error
+skipBrokerShutdownOnOOM=false
+
 # Enable backlog quota check. Enforces action on topic when the quota is 
reached
 backlogQuotaCheckEnabled=true
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index cf32ba3..9b3b615 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -57,6 +57,9 @@ zooKeeperOperationTimeoutSeconds=30
 # Time to wait for broker graceful shutdown. After this time elapses, the 
process will be killed
 brokerShutdownTimeoutMs=60000
 
+# Flag to skip broker shutdown when broker handles Out of memory error
+skipBrokerShutdownOnOOM=false
+
 # Enable backlog quota check. Enforces action on topic when the quota is 
reached
 backlogQuotaCheckEnabled=true
 
diff --git a/deployment/terraform-ansible/templates/broker.conf 
b/deployment/terraform-ansible/templates/broker.conf
index 1f78a35..8da1a4a 100644
--- a/deployment/terraform-ansible/templates/broker.conf
+++ b/deployment/terraform-ansible/templates/broker.conf
@@ -55,6 +55,9 @@ zooKeeperSessionTimeoutMillis=30000
 # Time to wait for broker graceful shutdown. After this time elapses, the 
process will be killed
 brokerShutdownTimeoutMs=60000
 
+# Flag to skip broker shutdown when broker handles Out of memory error
+skipBrokerShutdownOnOOM=false
+
 # Enable backlog quota check. Enforces action on topic when the quota is 
reached
 backlogQuotaCheckEnabled=true
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 190170d..239442e 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -213,6 +213,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         doc = "Time to wait for broker graceful shutdown. After this time 
elapses, the process will be killed"
     )
     private long brokerShutdownTimeoutMs = 60000;
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        dynamic = true,
+        doc = "Flag to skip broker shutdown when broker handles Out of memory 
error"
+    )
+    private boolean skipBrokerShutdownOnOOM = false;
 
     @FieldContext(
         category = CATEGORY_POLICIES,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index 37c5a1f..42a2cf1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -331,8 +331,12 @@ public class PulsarBrokerStarter {
         );
 
         PulsarByteBufAllocator.registerOOMListener(oomException -> {
-            log.error("-- Shutting down - Received OOM exception: {}", 
oomException.getMessage(), oomException);
-            starter.shutdown();
+            if (starter.brokerConfig.isSkipBrokerShutdownOnOOM()) {
+                log.error("-- Received OOM exception: {}", 
oomException.getMessage(), oomException);
+            } else {
+                log.error("-- Shutting down - Received OOM exception: {}", 
oomException.getMessage(), oomException);
+                starter.shutdown();
+            }
         });
 
         try {
diff --git a/site2/docs/reference-configuration.md 
b/site2/docs/reference-configuration.md
index edc3f99..c4ed57a 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -126,6 +126,7 @@ Pulsar brokers are responsible for handling incoming 
messages from producers, di
 |dispatchThrottlingRatePerReplicatorInByte| The default bytes per second 
dispatch throttling-limit for every replicator in replication. The value of `0` 
means disabling replication message-byte dispatch-throttling| 0 | 
 |zooKeeperSessionTimeoutMillis| Zookeeper session timeout in milliseconds 
|30000|
 |brokerShutdownTimeoutMs| Time to wait for broker graceful shutdown. After 
this time elapses, the process will be killed  |60000|
+|skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out 
of memory error. |false|
 |backlogQuotaCheckEnabled|  Enable backlog quota check. Enforces action on 
topic when the quota is reached  |true|
 |backlogQuotaCheckIntervalInSeconds|  How often to check for topics that have 
reached the quota |60|
 |backlogQuotaDefaultLimitGB| The default per-topic backlog quota limit | -1 |
@@ -335,6 +336,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) 
CLI tool can be used
 |clusterName| The name of the cluster that this broker belongs to. |standalone|
 |zooKeeperSessionTimeoutMillis| The ZooKeeper session timeout, in 
milliseconds. |30000|
 |brokerShutdownTimeoutMs| The time to wait for graceful broker shutdown. After 
this time elapses, the process will be killed. |60000|
+|skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out 
of memory error. |false|
 |backlogQuotaCheckEnabled|  Enable the backlog quota check, which enforces a 
specified action when the quota is reached.  |true|
 |backlogQuotaCheckIntervalInSeconds|  How often to check for topics that have 
reached the backlog quota.  |60|
 |backlogQuotaDefaultLimitGB|  The default per-topic backlog quota limit.  |10|

Reply via email to