codelipenghui opened a new pull request #9264:
URL: https://github.com/apache/pulsar/pull/9264


   ### Motivation
   
   The NullPointerException is thrown when the zookeeper had an OOM issue. 
After we increase the zookeeper memory and restart the zookeeper cluster, the 
broker still kept throwing NullPointerException. The exception was fixed after 
rolling restart all brokers.
   
   ```
   07:54:13.142 [pulsar-io-25-1] INFO  
org.apache.pulsar.broker.service.ServerCnx - [/10.0.168.5:42978] Subscribing on 
topic persistent://packhunt/billing/billing-records / extended-billing-records
   07:54:13.143 [Thread-241] WARN  org.apache.pulsar.broker.service.ServerCnx - 
[/10.0.168.5:42978][persistent://packhunt/billing/billing-records][extended-billing-records]
 Failed to create consumer: null
   java.util.concurrent.CompletionException: java.lang.NullPointerException
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 ~[?:1.8.0_252]
        at 
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1005)
 ~[?:1.8.0_252]
        at 
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) 
~[?:1.8.0_252]
        at 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$getSchema$6(BookkeeperSchemaStorage.java:175)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
 ~[?:1.8.0_252]
        at 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.getSchema(BookkeeperSchemaStorage.java:169)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.get(BookkeeperSchemaStorage.java:126)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.getSchema(SchemaRegistryServiceImpl.java:95)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.getSchema(SchemaRegistryServiceImpl.java:81)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.broker.service.schema.validator.SchemaRegistryServiceWithSchemaDataValidator.getSchema(SchemaRegistryServiceWithSchemaDataValidator.java:52)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.broker.service.AbstractTopic.hasSchema(AbstractTopic.java:244)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.addSchemaIfIdleOrCheckCompatible(PersistentTopic.java:2144)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:920) 
~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
 ~[?:1.8.0_252]
        at 
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) 
~[?:1.8.0_252]
        at 
org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$16(ServerCnx.java:902)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) 
~[?:1.8.0_252]
        at 
java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
 ~[?:1.8.0_252]
        at 
java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) 
~[?:1.8.0_252]
        at 
org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:852) 
~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:239)
 ~[org.apache.pulsar-pulsar-common-2.6.2.jar:2.6.2]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:191) 
~[io.netty-netty-handler-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:153)
 ~[io.netty-netty-handler-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
 ~[io.netty-netty-codec-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
 ~[io.netty-netty-codec-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
 
~[io.netty-netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:4.1.48.Final]
        at 
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) 
~[io.netty-netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:4.1.48.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) 
~[io.netty-netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:4.1.48.Final]
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
 ~[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
~[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
   Caused by: java.lang.NullPointerException
        at 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.openLedger(BookkeeperSchemaStorage.java:565)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.readSchemaEntry(BookkeeperSchemaStorage.java:470)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$null$4(BookkeeperSchemaStorage.java:185)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
 ~[?:1.8.0_252]
        ... 43 more
   ```
   
   The problem is the bookkeeper client used by the `BookkeeperSchemaStorage` 
does not create success due to the zookeeper issue. Currently, start the broker 
will create and start the SchemaStorage, but if the SchemaStorage start failed, 
the broker only prints a log `Unable to create schema registry storage`. after 
this moment, the broker will continue the start process, if the subsequent 
steps do not throw any exceptions, the broker will start successfully, however, 
the bookkeeper client of the `BookkeeperSchemaStorage`  will always be null.
   
   ### Modifications
   
   Make sure the SchemaStorage start success when starting the broker, if 
SchemaStorage starts failed, the broker also should be start failed.
   
   ### Verifying this change
   
   New test added.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (no)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to