abeliangroupie opened a new issue, #19660: URL: https://github.com/apache/pulsar/issues/19660
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Version **OS version:** Ubuntu 22.04 **Pulsar version:** 2.11.0 (using `apachepulsar/pulsar-all:2.11.0` Docker images) ### Minimal reproduce step It's not the cleanest, but here's a Docker Compose I've been working with. Note that it spins up 3 etcd + 3 bookie + 1 broker container. ```yaml version: "3.9" services: # etcd cluster. etcd1: image: quay.io/coreos/etcd:v3.5.7 container_name: etcd1 restart: always networks: - pulsar command: > /usr/local/bin/etcd --name node1 --initial-advertise-peer-urls http://etcd1:2380 --listen-peer-urls http://0.0.0.0:2380 --advertise-client-urls http://etcd1:2379 --listen-client-urls http://0.0.0.0:2379 --initial-cluster "node1=http://etcd1:2380,node2=http://etcd2:2380,node3=http://etcd3:2380" --initial-cluster-state new --initial-cluster-token initial-token etcd2: image: quay.io/coreos/etcd:v3.5.7 container_name: etcd2 restart: always networks: - pulsar command: > /usr/local/bin/etcd --name node2 --initial-advertise-peer-urls http://etcd2:2380 --listen-peer-urls http://0.0.0.0:2380 --advertise-client-urls http://etcd2:2379 --listen-client-urls http://0.0.0.0:2379 --initial-cluster "node1=http://etcd1:2380,node2=http://etcd2:2380,node3=http://etcd3:2380" --initial-cluster-state new --initial-cluster-token initial-token etcd3: image: quay.io/coreos/etcd:v3.5.7 container_name: etcd3 restart: always networks: - pulsar command: > /usr/local/bin/etcd --name node3 --initial-advertise-peer-urls http://etcd3:2380 --listen-peer-urls http://0.0.0.0:2380 --advertise-client-urls http://etcd3:2379 --listen-client-urls http://0.0.0.0:2379 --initial-cluster "node1=http://etcd1:2380,node2=http://etcd2:2380,node3=http://etcd3:2380" --initial-cluster-state new --initial-cluster-token initial-token # Container that only runs once to initialise metadata. bootstrap: image: apachepulsar/pulsar-all:2.11.0 container_name: bootstrap depends_on: - etcd1 - etcd2 - etcd3 networks: - pulsar command: > bin/pulsar initialize-cluster-metadata \ --cluster my-pulsar \ --metadata-store etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 \ --configuration-metadata-store etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 \ --web-service-url http://broker1:8080 \ --broker-service-url pulsar://broker1:6650 \ # BookKeeper cluster. bookie1: image: apachepulsar/pulsar-all:2.11.0 container_name: bookie1 restart: always depends_on: bootstrap: condition: service_completed_successfully networks: - pulsar volumes: - ${PWD}/bookkeeper.conf:/pulsar/conf/bookkeeper.conf command: bin/pulsar bookie bookie2: image: apachepulsar/pulsar-all:2.11.0 container_name: bookie2 restart: always depends_on: bootstrap: condition: service_completed_successfully networks: - pulsar volumes: - ${PWD}/bookkeeper.conf:/pulsar/conf/bookkeeper.conf command: bin/pulsar bookie bookie3: image: apachepulsar/pulsar-all:2.11.0 container_name: bookie3 restart: always depends_on: bootstrap: condition: service_completed_successfully networks: - pulsar volumes: - ${PWD}/bookkeeper.conf:/pulsar/conf/bookkeeper.conf command: bin/pulsar bookie # Broker. broker: image: apachepulsar/pulsar-all:2.11.0 container_name: broker restart: always depends_on: - bookie1 - bookie2 - bookie3 networks: - pulsar volumes: - ${PWD}/broker.conf:/pulsar/conf/broker.conf - ${PWD}/functions_worker.yml:/pulsar/conf/functions_worker.yml command: bin/pulsar broker networks: pulsar: driver: bridge ``` Config changes are as follows: **bookkeeper.conf**: ```ini useHostNameAsBookieID=true metadataServiceUri=metadata-store:etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 ``` **broker.conf**: ```ini clusterName=my-pulsar metadataStoreUrl=etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 functionsWorkerEnabled=true ``` **functions_worker.yml** ```yaml pulsarFunctionsCluster: my-pulsar configurationMetadataStoreUrl: metadata-store:etcd:http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 ``` To trigger the error, from inside the `broker` container: - Create a Pulsar function file `test_function.py` (or alternatively, `touch test_function.py` will suffice for triggering the error) - Run: ``` $ bin/pulsar-admin functions create \ --py test_function.py \ --classname test_function.ExamplePulsarFunction \ --tenant public \ --namespace default \ --name test-function \ --inputs persistent://public/default/test-input-topic \ --output persistent://public/default/test-output-topic ``` ### What did you expect to see? The Pulsar function successfully being added (or, with the test command using `touch` above, Pulsar complaining that `test_function.ExamplePulsarFunction` doesn't exist). ### What did you see instead? The error seen is: ``` Cannot invoke "org.apache.distributedlog.api.namespace.Namespace.logExists(String)" because "dlogNamespace" is null Reason: Cannot invoke "org.apache.distributedlog.api.namespace.Namespace.logExists(String)" because "dlogNamespace" is null ``` Traceback from the `broker` image: ``` 2023-02-28T05:49:23,816+0000 [pulsar-web-38-11] ERROR org.apache.pulsar.functions.worker.rest.api.FunctionsImpl - Failed process Function public/default/test-function package: java.lang.NullPointerException: Cannot invoke "org.apache.distributedlog.api.namespace.Namespace.logExists(String)" because "dlogNamespace" is null at org.apache.pulsar.functions.worker.WorkerUtils.uploadToBookKeeper(WorkerUtils.java:90) ~[org.apache.pulsar-pulsar-functions-worker-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.worker.WorkerUtils.uploadFileToBookkeeper(WorkerUtils.java:80) ~[org.apache.pulsar-pulsar-functions-worker-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.worker.rest.api.ComponentImpl.getFunctionPackageLocation(ComponentImpl.java:416) ~[org.apache.pulsar-pulsar-functions-worker-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.registerFunction(FunctionsImpl.java:240) ~[org.apache.pulsar-pulsar-functions-worker-2.11.0.jar:2.11.0] at org.apache.pulsar.broker.admin.impl.FunctionsBase.registerFunction(FunctionsBase.java:200) ~[org.apache.pulsar-pulsar-broker-2.11.0.jar:2.11.0] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?] at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:475) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:397) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:255) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:292) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:274) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:234) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?] at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?] at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?] at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?] at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?] at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1656) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622] at org.apache.pulsar.broker.web.ResponseHandlerFilter.doFilter(ResponseHandlerFilter.java:67) ~[org.apache.pulsar-pulsar-broker-2.11.0.jar:2.11.0] at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.servlets.QoSFilter.doFilter(QoSFilter.java:202) ~[org.eclipse.jetty-jetty-servlets-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:552) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505) ~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.server.Server.handle(Server.java:516) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) ~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) ~[org.eclipse.jetty-jetty-io-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105) ~[org.eclipse.jetty-jetty-io-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) ~[org.eclipse.jetty-jetty-io-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622] at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409) ~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.86.Final.jar:4.1.86.Final] at java.lang.Thread.run(Thread.java:833) ~[?:?] ``` ### Anything else? I suspect the main issue is that various parts of dlog implementation are still ZooKeeper-specific, for example: - `initInBroker()` in `PulsarWorkerService` only sets `dlogURI` if `brokerConfig.isMetadataStoreBackedByZookeeper()` is true - `initializeCluster()` in `PulsarClusterMetadataSetup` (for `bin/pulsar-admin initialize-cluster-metadata`) only initializes the dlog namespace if `if (localStore instanceof ZKMetadataStore && configStore instanceof ZKMetadataStore) {` is true. This one means that setting `initializedDlogMetadata: true` in `functions_worker.yml` doesn't help, since the metadata's not actually set up. This all means that by the time `uploadToBookKeeper()` is called in `WorkerUtils`, `dlogNamespace` is null and `dlogNamespace.logExists()` fails. ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
