Raven888888 opened a new issue, #17024:
URL: https://github.com/apache/pulsar/issues/17024

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   Ubuntu 20.04
   Pulsar 2.7.0
   
   Cluster setup
   5 zookeeper
   5 bookkeeper
   5 broker
    - with pulsar function and websocket enabled
    - 4 out of 5 are behind a proxy, the last 1 is isolated from external 
connections in order to focus on pulsar function executions
   
   Each node with at least 64 GB RAM, 16 physical core CPU
   
   ### Minimal reproduce step
   
   Start the cluster, it should run stably at first.
   
   Create 20k websocket connections to the proxy, thus each of 4 brokers behind 
proxy establishes 5k websocket connections.
   Each connection creates a new topic, so total 20k topics.
   Send data about 1k messages per seconds.
   
   
   ### What did you expect to see?
   
   Brokers should handle the load well, and not die *collectively* (that's the 
whole point of clustering!).
   
   ### What did you see instead?
   
   Our cluster was running OK for slightly more than a day. We were using it 
with Apache Flink, we did occasionally see this error from Flink
   
   ```
   java.lang.RuntimeException: Failed to get schema information for 
persistent://public/xxxxx/xxxxxxxx
        at 
org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils.uploadPulsarSchema(SchemaUtils.java:64)
        at 
org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase.uploadSchema(FlinkPulsarSinkBase.java:302)
        at 
org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase.getProducer(FlinkPulsarSinkBase.java:320)
        at 
org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink.invoke(FlinkPulsarSink.java:103)
        at 
org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink.invoke(FlinkPulsarSink.java:41)
        at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
        at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at 
org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:64)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: 
org.apache.pulsar.client.admin.PulsarAdminException$ServerSideErrorException: 
HTTP 500 Internal Server Error
        at 
org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:209)
        at 
org.apache.pulsar.client.admin.internal.SchemasImpl$1.failed(SchemasImpl.java:85)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation$1.failed(JerseyInvocation.java:839)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation$1.completed(JerseyInvocation.java:820)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime.processResponse(ClientRuntime.java:229)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime.access$200(ClientRuntime.java:62)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime$2.lambda$response$0(ClientRuntime.java:173)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors.process(Errors.java:292)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors.process(Errors.java:274)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors.process(Errors.java:244)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:288)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime$2.response(ClientRuntime.java:173)
        at 
org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$apply$1(AsyncHttpConnector.java:212)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$3(AsyncHttpConnector.java:253)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.pulsar.shade.org.asynchttpclient.netty.NettyResponseFuture.loadContent(NettyResponseFuture.java:222)
        at 
org.apache.pulsar.shade.org.asynchttpclient.netty.NettyResponseFuture.done(NettyResponseFuture.java:257)
        at 
org.apache.pulsar.shade.org.asynchttpclient.netty.handler.AsyncHttpClientHandler.finishUpdate(AsyncHttpClientHandler.java:241)
        at 
org.apache.pulsar.shade.org.asynchttpclient.netty.handler.HttpHandler.handleChunk(HttpHandler.java:114)
        at 
org.apache.pulsar.shade.org.asynchttpclient.netty.handler.HttpHandler.handleRead(HttpHandler.java:143)
        at 
org.apache.pulsar.shade.org.asynchttpclient.netty.handler.AsyncHttpClientHandler.channelRead(AsyncHttpClientHandler.java:78)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.pulsar.shade.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.pulsar.shade.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        at 
org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at 
org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        at 
org.apache.pulsar.shade.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at 
org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        ... 1 more
   Caused by: org.apache.pulsar.shade.javax.ws.rs.InternalServerErrorException: 
HTTP 500 Internal Server Error
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.convertToException(JerseyInvocation.java:914)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.access$500(JerseyInvocation.java:77)
        ... 54 more
   ```
   
   But it seems pretty harmless. Flink job able to run after restart. All 
brokers are healthy.
   
   At some point after a day, when the load starts peaking, all 4 brokers 
showing these logs
   ```
   ...
   [pulsar-client-io-63-6] ERROR org.apache.pulsar.client.impl.ProducerImpl - 
[xxxx] [null] Failed to create producer: java.io.IOException: Error while using 
ZooKeeper -  ledger=24 - operation=Failed to open ledger
   ...
   [pulsar-client-io-63-1]  ERROR org.apache.pulsar.client.impl.ProducerImpl - 
[xxxx] [xxxx] Failed to create producer: Disconnected from server at xxxxx
   ```
   All 4 nodes are unhealthy at this stage, and they are not able to recover 
itself from this state without manual intervention.
   
   After manually restarting brokers, it is back to normal again.
   
   
   ### Anything else?
   
   We have serious instability issues with our production Pulsar cluster with 3 
nodes. We recently updated to 5 nodes, hoping it can be more stable. 
Unfortunately, the broker still become unhealthy and cannot cover from it. 
Manual intervention is costly and not feasible for production environment.
   
   Why brokers become unhealthy collectively? How can we prevent that?
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to