GitHub user gfloranc edited a discussion: Pulsar 2.11.0 : need help to configue
the statefull functions
Hi,
I have deployed a bare-metal cluster of Apache Pulsar with 3
brokers, 3 zk and 3 bookies.
On one bookie (this of the state storage service) :
I have configured the Pulsar Function service like this :
https://pulsar.apache.org/docs/2.11.x/functions-worker-stateful/
I have also added this configuration in order to activate the
state storage service in bookkeeper.conf :
extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
And it works with a telnet on the 4181 port on the bookie of the
function
I have created a function which count a number of each element of
a category.
After creating the function, I started it but the Rest API told
me « Operation not permitted »
And the logs of the function showed that it works but it can’t
access the state storage :
2023-05-05T14:52:09,719+0200 [archi/damir/AggregateByType-0] WARN
org.apache.pulsar.functions.instance.JavaInstanceRunnable - Encountered
exception when processing message
PulsarRecord(topicName=Optional[persistent://archi/damir/breakdown-partition-1],
partition=0,
message=Optional[org.apache.pulsar.client.impl.TopicMessageImpl@5a87dcea],
[schema=org.apache.pulsar.client.impl.schema.StringSchema@232c7c22](mailto:schema=org.apache.pulsar.client.impl.schema.StringSchema@232c7c22),
[failFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$270/0x00000008010732d0@67552716](mailto:failFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$270/0x00000008010732d0@67552716),
[ackFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$269/0x00000008010730a8@104a7d04](mailto:ackFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$269/0x00000008010730a8@104a7d04),
[customAckFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$268/0x00000008
01072e70@71ff6c4f](mailto:customAckFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$268/0x0000000801072e70@71ff6c4f))
java.lang.IllegalStateException: State archi/damir/AggregateByType is not
enabled.
at
com.google.common.base.Preconditions.checkState(Preconditions.java:840)
~[java-instance.jar:?]
at
org.apache.pulsar.functions.instance.ContextImpl.ensureStateEnabled(ContextImpl.java:395)
~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
at
org.apache.pulsar.functions.instance.ContextImpl.incrCounter(ContextImpl.java:409)
~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
at
fr.cnp.kfk.functions.AggregateByType.process(AggregateByType.java:25) ~[?:?]
at
fr.cnp.kfk.functions.AggregateByType.process(AggregateByType.java:13) ~[?:?]
at
org.apache.pulsar.functions.instance.JavaInstance.handleMessage(JavaInstance.java:96)
~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
at
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:289)
~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
I found a similar exception on the mailig list of Apache Pulsar
but it seems that the adviced configuration is for a standalone cluster. Not a
real cluster. But the cause still the same (broker logs trace something
interesting) :
2023-05-04T10:39:04,413+0200 [pulsar-web-37-5] INFO
org.apache.pulsar.common.nar.NarUnpacker - Created directory
/tmp/pulsar-nar/functions8147132923340170665.tmp-unpacked
2023-05-04T10:39:04,414+0200 [pulsar-web-37-5] INFO
org.apache.pulsar.common.nar.NarUnpacker - Extracting
/tmp/functions8147132923340170665.tmp to
/tmp/pulsar-nar/functions8147132923340170665.tmp-unpacked/O8ImIxPKJnMtvaUANK9KXA
2023-05-04T10:39:04,414+0200 [pulsar-web-37-5] ERROR
org.apache.pulsar.common.nar.NarUnpacker - There was a problem extracting the
nar file. Deleting
/tmp/pulsar-nar/functions8147132923340170665.tmp-unpacked/O8ImIxPKJnMtvaUANK9KXA
to clean up state.
java.io.FileNotFoundException:
/tmp/pulsar-nar/functions8147132923340170665.tmp-unpacked/O8ImIxPKJnMtvaUANK9KXA/META-INF/MANIFEST.MF
(No such file or directory)
at java.io.FileOutputStream.open0(Native Method) ~[?:?]
at java.io.FileOutputStream.open(FileOutputStream.java:293) ~[?:?]
at java.io.FileOutputStream.<init>(FileOutputStream.java:235)
~[?:?]
at java.io.FileOutputStream.<init>(FileOutputStream.java:184)
~[?:?]
at
org.apache.pulsar.common.nar.NarUnpacker.makeFile(NarUnpacker.java:143)
~[org.apache.pulsar-pulsar-common-2.11.0.jar:2.11.0]
at
org.apache.pulsar.common.nar.NarUnpacker.unpack(NarUnpacker.java:126)
~[org.apache.pulsar-pulsar-common-2.11.0.jar:2.11.0]
at
org.apache.pulsar.common.nar.NarUnpacker.doUnpackNar(NarUnpacker.java:95)
~[org.apache.pulsar-pulsar-common-2.11.0.jar:2.11.0]
at
org.apache.pulsar.common.nar.NarUnpacker.unpackNar(NarUnpacker.java:64)
~[org.apache.pulsar-pulsar-common-2.11.0.jar:2.11.0]
at
org.apache.pulsar.common.nar.NarClassLoader.getFromArchive(NarClassLoader.java:148)
~[org.apache.pulsar-pulsar-common-2.11.0.jar:2.11.0]
at
org.apache.pulsar.common.nar.NarClassLoaderBuilder.build(NarClassLoaderBuilder.java:72)
~[org.apache.pulsar-pulsar-common-2.11.0.jar:2.11.0]
at
org.apache.pulsar.functions.utils.FunctionCommon.extractNarClassLoader(FunctionCommon.java:287)
~[org.apache.pulsar-pulsar-functions-utils-2.11.0.jar:2.11.0]
at
org.apache.pulsar.functions.utils.FunctionCommon.getClassLoaderFromPackage(FunctionCommon.java:424)
~[org.apache.pulsar-pulsar-functions-utils-2.11.0.jar:2.11.0]
at
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.getClassLoaderFromPackage(ComponentImpl.java:1848)
~[org.apache.pulsar-pulsar-functions-worker-2.11.0.jar:2.11.0]
at
org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.validateUpdateRequestParams(FunctionsImpl.java:813)
~[org.apache.pulsar-pulsar-functions-worker-2.11.0.jar:2.11.0]
at
org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.updateFunction(FunctionsImpl.java:371)
~[org.apache.pulsar-pulsar-functions-worker-2.11.0.jar:2.11.0]
at
org.apache.pulsar.broker.admin.impl.FunctionsBase.updateFunction(FunctionsBase.java:326)
~[org.apache.pulsar-pulsar-broker-2.11.0.jar:2.11.0]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) ~[?:?]
at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
~[?:?]
at
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at
org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124)
~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167)
~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at
org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159)
~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79)
~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at
org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:475)
~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at
org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:397)
~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at
org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81)
~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at
org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:255)
~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
at
org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
at
org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:234)
~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at
org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
at
org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
at
org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
at
org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)
~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
at
org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)
~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
at
org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
at
org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1656)
~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.apache.pulsar.broker.web.ResponseHandlerFilter.doFilter(ResponseHandlerFilter.java:67)
~[org.apache.pulsar-pulsar-broker-2.11.0.jar:2.11.0]
at
org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626)
~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.servlets.QoSFilter.doFilter(QoSFilter.java:202)
~[org.eclipse.jetty-jetty-servlets-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626)
~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:552)
~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624)
~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440)
~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505)
~[org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594)
~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355)
~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181)
~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at org.eclipse.jetty.server.Server.handle(Server.java:516)
~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)
~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)
~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)
~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
~[org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
~[org.eclipse.jetty-jetty-io-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
~[org.eclipse.jetty-jetty-io-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
~[org.eclipse.jetty-jetty-io-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)
~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)
~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
at
org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409)
~[org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar:9.4.48.v20220622]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
~[?:?]
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
~[io.netty-netty-common-4.1.86.Final.jar:4.1.86.Final]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
Could you help me ?
Thanks for all
GitHub link: https://github.com/apache/pulsar/discussions/20300
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]