JarvisZhu opened a new issue #14227: URL: https://github.com/apache/pulsar/issues/14227
pulsar集群1:10.66.107.31/32/33,每台服务器上一个broker实例、一个bookie实例 local zookeeper集群:10.66.107.34:2181,10.66.107.34:2182,10.66.107.34:2183 pulsar集群2:10.66.107.37/38/39,每台服务器上一个broker实例、一个bookie实例 local zookeeper集群:10.66.107.36:2181,10.66.107.36:2182,10.66.107.36:2183 共享的存储配置Zookeeper集群:10.66.107.35:2181,10.66.107.35:2182,10.66.107.35:2183 集群1的配置如下(以31为例): broker.conf: zookeeperServers=10.66.107.34:2181,10.66.107.34:2182,10.66.107.34:2183 configurationStoreServers=10.66.107.35:2181,10.66.107.35:2182,10.66.107.35:2183 brokerServicePortTls=6651 webServicePortTls=8443 advertisedAddress=10.66.107.31 clusterName=pulsar-cluster-1 functionsWorkerEnabled=true bookkeeper.conf advertisedAddress=10.66.107.31 bookieId=31 zkServers=10.66.107.34:2181,10.66.107.34:2182,10.66.107.34:2183 httpServerEnabled=true functions_worker.yml workerId: 31 workerHostname: 10.66.107.31 configurationStoreServers: 10.66.107.34:2181 pulsarFunctionsCluster: pulsar-cluster-1 stateStorageServiceUrl: bk://localhost:4181 pulsarFunctionsNamespace: public/functions1 集群2的配置如下(以37为例): broker.conf zookeeperServers=10.66.107.36:2181,10.66.107.36:2182,10.66.107.36:2183 configurationStoreServers=10.66.107.35:2181,10.66.107.35:2182,10.66.107.35:2183 brokerServicePortTls=6651 webServicePortTls=8443 advertisedAddress=10.66.107.37 clusterName=pulsar-cluster-2 functionsWorkerEnabled=true bookkeeper.conf advertisedAddress=10.66.107.37 bookieId=37 zkServers=10.66.107.36:2181,10.66.107.36:2182,10.66.107.36:2183 httpServerEnabled=true functions_worker.yml workerId: 37 workerHostname: 10.66.107.37 configurationStoreServers: 10.66.107.36:2181 pulsarFunctionsCluster: pulsar-cluster-2 stateStorageServiceUrl: bk://localhost:4181 pulsarFunctionsNamespace: public/functions2 问题1: 两个集群一共6个broker.conf中的functionsWorkerEnabled=true。 第一个集群bookie、broker启动正常,第二个集群的三个bookie启动正常,但是三个broker启动报错(后将functionsWorkerEnabled全部改为false后正常): 15:17:55.899 [ForkJoinPool.commonPool-worker-1] WARN org.apache.pulsar.broker.web.PulsarWebResource - Namespace missing local cluster name in clusters list: local_cluster=pulsar-cluster-2 ns=public/functions clusters=[pulsar-cluster-1] 15:17:55.924 [pulsar-web-40-15] INFO org.eclipse.jetty.server.RequestLog - 10.66.107.37 - - [26/Jan/2022:15:17:55 +0800] "PUT /admin/v2/persistent/public/functions/assignments HTTP/1.1" 412 60 "-" "Pulsar-Java-v2.8.0" 139 15:17:55.933 [AsyncHttpClient-57-1] WARN org.apache.pulsar.client.admin.internal.BaseResource - [http://10.66.107.37:8080/admin/v2/persistent/public/functions/assignments] Failed to perform http put request: javax.ws.rs.ClientErrorException: HTTP 412 Precondition Failed 15:17:55.944 [main] ERROR org.apache.pulsar.functions.worker.PulsarWorkerService - Error Starting up in worker org.apache.pulsar.client.admin.PulsarAdminException$PreconditionFailedException: Namespace does not have any clusters configured at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:236) ~[org.apache.pulsar-pulsar-client-admin-original-2.8.0.jar:2.8.0] at org.apache.pulsar.client.admin.internal.BaseResource$1.failed(BaseResource.java:130) ~[org.apache.pulsar-pulsar-client-admin-original-2.8.0.jar:2.8.0] at org.glassfish.jersey.client.JerseyInvocation$1.failed(JerseyInvocation.java:882) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?] at org.glassfish.jersey.client.JerseyInvocation$1.completed(JerseyInvocation.java:863) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?] at org.glassfish.jersey.client.ClientRuntime.processResponse(ClientRuntime.java:229) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?] at org.glassfish.jersey.client.ClientRuntime.access$200(ClientRuntime.java:62) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?] at org.glassfish.jersey.client.ClientRuntime$2.lambda$response$0(ClientRuntime.java:173) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?] at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:292) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:274) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:288) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.client.ClientRuntime$2.response(ClientRuntime.java:173) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?] at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$apply$1(AsyncHttpConnector.java:212) ~[org.apache.pulsar-pulsar-client-admin-original-2.8.0.jar:2.8.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_131] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_131] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_131] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_131] at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$4(AsyncHttpConnector.java:254) ~[org.apache.pulsar-pulsar-client-admin-original-2.8.0.jar:2.8.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_131] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_131] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_131] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_131] at org.asynchttpclient.netty.NettyResponseFuture.loadContent(NettyResponseFuture.java:222) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.NettyResponseFuture.done(NettyResponseFuture.java:257) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.handler.AsyncHttpClientHandler.finishUpdate(AsyncHttpClientHandler.java:241) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.handler.HttpHandler.handleChunk(HttpHandler.java:114) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.handler.HttpHandler.handleRead(HttpHandler.java:143) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.handler.AsyncHttpClientHandler.channelRead(AsyncHttpClientHandler.java:78) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[io.netty-netty-codec-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[io.netty-netty-codec-4.1.63.Final.jar:4.1.63.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[io.netty-netty-codec-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[io.netty-netty-transport-4.1.63.Final.jar:4.1.63.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_131] Caused by: javax.ws.rs.ClientErrorException: HTTP 412 Precondition Failed at org.glassfish.jersey.client.JerseyInvocation.createExceptionForFamily(JerseyInvocation.java:985) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?] at org.glassfish.jersey.client.JerseyInvocation.convertToException(JerseyInvocation.java:967) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?] at org.glassfish.jersey.client.JerseyInvocation.access$700(JerseyInvocation.java:82) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?] ... 54 more 问题2: 将functionsWorkerEnabled全部改为false后正常,但是在集群1某个节点上执行如下命令配置从pulsar-cluster-1到pulsar-cluster-2的跨地域复制时: bin/pulsar-admin clusters create \ --broker-url pulsar://10.66.107.37:6650,10.66.107.38:6650,10.66.107.39:6650 \ --url http://10.66.107.37:8080,10.66.107.38:8080,10.66.107.39:8080 \ pulsar-cluster-2 报错: 22:26:57.207 [AsyncHttpClient-7-1] WARN org.apache.pulsar.client.admin.internal.BaseResource - [http://10.66.107.32:8080/admin/v2/clusters/pulsar-cluster-2] Failed to perform http put request: javax.ws.rs.ClientErrorException: HTTP 409 Conflict Cluster already exists Reason: Cluster already exists 难道按照上述步骤搭建完之后,自动就是双向的全连通复制了?如果是这样的话,单向复制模式和failover模式该如何搭建呢? 问题3: # 测试结果: 1.使用命令行在两个集群中发送消息:先在集群1上进行消费,消费完之后连到集群2上重启Java代码消费者,没有重复消费; 2.使用命令行在两个集群中发送消息:先在集群2上进行消费,消费完之后连到集群1上重启Java代码消费者,有重复消费; -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
