JarvisZhu opened a new issue #14228:
URL: https://github.com/apache/pulsar/issues/14228
pulsar集群:10.66.107.31/32/33,每台服务器上一个broker实例、一个bookie实例
local zookeeper / configuration
store集群:10.66.107.34:2181,10.66.107.34:2182,10.66.107.34:2183
以31为例:
broker.conf:
zookeeperServers=10.66.107.34:2181,10.66.107.34:2182,10.66.107.34:2183
configurationStoreServers=10.66.107.34:2181,10.66.107.34:2182,10.66.107.34:2183
brokerServicePortTls=6651
webServicePortTls=8443
advertisedAddress=10.66.107.31
clusterName=pulsar-cluster-1
functionsWorkerEnabled=true
bookkeeper.conf:
advertisedAddress=10.66.107.31
bookieId=31
zkServers=10.66.107.34:2181,10.66.107.34:2182,10.66.107.34:2183
httpServerEnabled=true
extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
functions_worker.yml:
workerId: 31
workerHostname: 10.66.107.31
configurationStoreServers: 10.66.107.34:2181
pulsarFunctionsCluster: pulsar-cluster-1
stateStorageServiceUrl: bk://localhost:4181
localrun运行官方示例输入输出消息正常,但是用create命令创建函数后(status命令查看是running状态),再往输入topic里发消息,监听输出topic收不到消息。
./pulsar-admin functions localrun \
--jar /usr/local/pulsar/examples/api-examples.jar \
--classname org.apache.pulsar.functions.api.examples.PublishFunction \
--tenant public \
--namespace default \
--name wordCount \
--inputs persistent://public/default/input \
--output persistent://public/default/output \
--log-topic persistent://public/default/logs \
--parallelism 3 \
--cpu 2 \
--ram 2147483648 \
--disk 10737418240 \
--user-config '{"ck":"cv"}'
问题1:
当调用context对象的Stateful相关函数时会报错:
java.lang.IllegalStateException: State public/default/wordCount is not
enabled.
at
com.google.common.base.Preconditions.checkState(Preconditions.java:843)
at
org.apache.pulsar.functions.instance.ContextImpl.ensureStateEnabled(ContextImpl.java:365)
at
org.apache.pulsar.functions.instance.ContextImpl.getCounter(ContextImpl.java:391)
at
org.example.functions.WordCountFunction.lambda$process$0(WordCountFunction.java:68)
at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
at
org.example.functions.WordCountFunction.process(WordCountFunction.java:62)
at
org.example.functions.WordCountFunction.process(WordCountFunction.java:18)
at
org.apache.pulsar.functions.instance.JavaInstance.handleMessage(JavaInstance.java:95)
at
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:271)
at java.lang.Thread.run(Thread.java:748)
java.lang.IllegalStateException: State public/default/wordCount is not
enabled.
at
com.google.common.base.Preconditions.checkState(Preconditions.java:843)
at
org.apache.pulsar.functions.instance.ContextImpl.ensureStateEnabled(ContextImpl.java:365)
at
org.apache.pulsar.functions.instance.ContextImpl.getCounter(ContextImpl.java:391)
at
org.example.functions.WordCountFunction.lambda$process$0(WordCountFunction.java:68)
at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
at
org.example.functions.WordCountFunction.process(WordCountFunction.java:62)
at
org.example.functions.WordCountFunction.process(WordCountFunction.java:18)
at
org.apache.pulsar.functions.instance.JavaInstance.handleMessage(JavaInstance.java:95)
at
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:271)
at java.lang.Thread.run(Thread.java:748)
问题2:
总是报slf4j的错误,就算方法体中不打印日志。
2022-02-09 17:41:54,521 public/default/wordCount-1 WARN
org.apache.logging.slf4j.Log4jLogger caught java.lang.NullPointerException
logging ParameterizedMessage: [{}] [{}] add message to batch, num messages in
batch so far {} java.lang.NullPointerException
at
org.apache.logging.log4j.core.config.AppenderControl.appenderErrorHandlerMessage(AppenderControl.java:113)
at
org.apache.logging.log4j.core.config.AppenderControl.isRecursiveCall(AppenderControl.java:105)
at
org.apache.logging.log4j.core.config.AppenderControl.shouldSkip(AppenderControl.java:88)
at
org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:81)
at
org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:543)
at
org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:502)
at
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:485)
at
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:460)
at
org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy.log(AwaitCompletionReliabilityStrategy.java:82)
at org.apache.logging.log4j.core.Logger.log(Logger.java:161)
at
org.apache.logging.log4j.spi.AbstractLogger.tryLogMessage(AbstractLogger.java:2198)
at
org.apache.logging.log4j.spi.AbstractLogger.logMessageTrackRecursion(AbstractLogger.java:2152)
at
org.apache.logging.log4j.spi.AbstractLogger.logMessageSafely(AbstractLogger.java:2135)
at
org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:2022)
at
org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:1891)
at org.apache.logging.slf4j.Log4jLogger.debug(Log4jLogger.java:134)
at
org.apache.pulsar.client.impl.BatchMessageContainerImpl.add(BatchMessageContainerImpl.java:64)
at
org.apache.pulsar.client.impl.ProducerImpl.serializeAndSendMessage(ProducerImpl.java:528)
at
org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:458)
at
org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:292)
at
org.apache.pulsar.client.impl.ProducerImpl.internalSendWithTxnAsync(ProducerImpl.java:363)
at
org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendWithTxnAsync(PartitionedProducerImpl.java:191)
at
org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendAsync(PartitionedProducerImpl.java:167)
at
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:103)
at
org.apache.pulsar.functions.instance.LogAppender.append(LogAppender.java:53)
at
org.apache.logging.log4j.core.config.AppenderControl.tryCallAppender(AppenderControl.java:156)
at
org.apache.logging.log4j.core.config.AppenderControl.callAppender0(AppenderControl.java:129)
at
org.apache.logging.log4j.core.config.AppenderControl.callAppenderPreventRecursion(AppenderControl.java:120)
at
org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:84)
at
org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:543)
at
org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:502)
at
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:485)
at
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:460)
at
org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy.log(AwaitCompletionReliabilityStrategy.java:82)
at org.apache.logging.log4j.core.Logger.log(Logger.java:161)
at
org.apache.logging.log4j.spi.AbstractLogger.tryLogMessage(AbstractLogger.java:2198)
at
org.apache.logging.log4j.spi.AbstractLogger.logMessageTrackRecursion(AbstractLogger.java:2152)
at
org.apache.logging.log4j.spi.AbstractLogger.logMessageSafely(AbstractLogger.java:2135)
at
org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:2022)
at
org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:1891)
at org.apache.logging.slf4j.Log4jLogger.debug(Log4jLogger.java:134)
at
org.apache.pulsar.client.impl.BatchMessageContainerImpl.add(BatchMessageContainerImpl.java:64)
at
org.apache.pulsar.client.impl.ProducerImpl.serializeAndSendMessage(ProducerImpl.java:528)
at
org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:458)
at
org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:292)
at
org.apache.pulsar.client.impl.ProducerImpl.internalSendWithTxnAsync(ProducerImpl.java:363)
at
org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendWithTxnAsync(PartitionedProducerImpl.java:191)
at
org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendAsync(PartitionedProducerImpl.java:167)
at
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:103)
at
org.apache.pulsar.functions.instance.LogAppender.append(LogAppender.java:53)
at
org.apache.logging.log4j.core.config.AppenderControl.tryCallAppender(AppenderControl.java:156)
at
org.apache.logging.log4j.core.config.AppenderControl.callAppender0(AppenderControl.java:129)
at
org.apache.logging.log4j.core.config.AppenderControl.callAppenderPreventRecursion(AppenderControl.java:120)
at
org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:84)
at
org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:543)
at
org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:502)
at
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:485)
at
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:460)
at
org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy.log(AwaitCompletionReliabilityStrategy.java:82)
at org.apache.logging.log4j.core.Logger.log(Logger.java:161)
at
org.apache.logging.log4j.spi.AbstractLogger.tryLogMessage(AbstractLogger.java:2198)
at
org.apache.logging.log4j.spi.AbstractLogger.logMessageTrackRecursion(AbstractLogger.java:2152)
at
org.apache.logging.log4j.spi.AbstractLogger.logMessageSafely(AbstractLogger.java:2135)
at
org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:2028)
at
org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:1899)
at org.apache.logging.slf4j.Log4jLogger.info(Log4jLogger.java:184)
at
org.example.functions.WordCountFunction.process(WordCountFunction.java:46)
at
org.example.functions.WordCountFunction.process(WordCountFunction.java:18)
at
org.apache.pulsar.functions.instance.JavaInstance.handleMessage(JavaInstance.java:95)
at
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:271)
at java.lang.Thread.run(Thread.java:748)
问题3:
本地运行时能输出消息(localrun),但是正式运行时(create命令)从输出topic里收不到消息(create)。bookie里一直报错:
09:28:51.004 [storage-scheduler-OrderedScheduler-0-0] INFO
org.apache.bookkeeper.clients.impl.container.StorageContainerChannel - Failed
to fetch info of storage container (0) - 'StorageContainerError : StatusCode =
INTERNAL_SERVER_ERROR, Error = fail to fetch location for storage container
(0)'. Retry in 200 ms ...
09:28:51.205 [storage-scheduler-OrderedScheduler-0-0] INFO
org.apache.bookkeeper.clients.impl.container.StorageContainerChannel - Failed
to fetch info of storage container (0) - 'StorageContainerError : StatusCode =
INTERNAL_SERVER_ERROR, Error = fail to fetch location for storage container
(0)'. Retry in 200 ms ...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]