JarvisZhu opened a new issue #14228:
URL: https://github.com/apache/pulsar/issues/14228


   pulsar集群:10.66.107.31/32/33,每台服务器上一个broker实例、一个bookie实例
   local zookeeper / configuration 
store集群:10.66.107.34:2181,10.66.107.34:2182,10.66.107.34:2183
   
   以31为例:
   broker.conf:
   zookeeperServers=10.66.107.34:2181,10.66.107.34:2182,10.66.107.34:2183
   
configurationStoreServers=10.66.107.34:2181,10.66.107.34:2182,10.66.107.34:2183
   brokerServicePortTls=6651
   webServicePortTls=8443
   advertisedAddress=10.66.107.31
   clusterName=pulsar-cluster-1
   functionsWorkerEnabled=true
   
   bookkeeper.conf:
   advertisedAddress=10.66.107.31
   bookieId=31
   zkServers=10.66.107.34:2181,10.66.107.34:2182,10.66.107.34:2183
   httpServerEnabled=true
   
extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
   
   functions_worker.yml:
   workerId: 31
   workerHostname: 10.66.107.31
   configurationStoreServers: 10.66.107.34:2181
   pulsarFunctionsCluster: pulsar-cluster-1
   stateStorageServiceUrl: bk://localhost:4181
   
   
localrun运行官方示例输入输出消息正常,但是用create命令创建函数后(status命令查看是running状态),再往输入topic里发消息,监听输出topic收不到消息。
   
   ./pulsar-admin functions localrun \
   --jar /usr/local/pulsar/examples/api-examples.jar \
   --classname org.apache.pulsar.functions.api.examples.PublishFunction \
   --tenant public \
   --namespace default \
   --name wordCount \
   --inputs persistent://public/default/input \
   --output persistent://public/default/output \
   --log-topic persistent://public/default/logs \
   --parallelism 3 \
   --cpu 2 \
   --ram 2147483648 \
   --disk 10737418240 \
   --user-config '{"ck":"cv"}'
   
   问题1:
   当调用context对象的Stateful相关函数时会报错:
   java.lang.IllegalStateException: State public/default/wordCount is not 
enabled.
           at 
com.google.common.base.Preconditions.checkState(Preconditions.java:843)
           at 
org.apache.pulsar.functions.instance.ContextImpl.ensureStateEnabled(ContextImpl.java:365)
           at 
org.apache.pulsar.functions.instance.ContextImpl.getCounter(ContextImpl.java:391)
           at 
org.example.functions.WordCountFunction.lambda$process$0(WordCountFunction.java:68)
           at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
           at 
org.example.functions.WordCountFunction.process(WordCountFunction.java:62)
           at 
org.example.functions.WordCountFunction.process(WordCountFunction.java:18)
           at 
org.apache.pulsar.functions.instance.JavaInstance.handleMessage(JavaInstance.java:95)
           at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:271)
           at java.lang.Thread.run(Thread.java:748)
   java.lang.IllegalStateException: State public/default/wordCount is not 
enabled.
           at 
com.google.common.base.Preconditions.checkState(Preconditions.java:843)
           at 
org.apache.pulsar.functions.instance.ContextImpl.ensureStateEnabled(ContextImpl.java:365)
           at 
org.apache.pulsar.functions.instance.ContextImpl.getCounter(ContextImpl.java:391)
           at 
org.example.functions.WordCountFunction.lambda$process$0(WordCountFunction.java:68)
           at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
           at 
org.example.functions.WordCountFunction.process(WordCountFunction.java:62)
           at 
org.example.functions.WordCountFunction.process(WordCountFunction.java:18)
           at 
org.apache.pulsar.functions.instance.JavaInstance.handleMessage(JavaInstance.java:95)
           at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:271)
           at java.lang.Thread.run(Thread.java:748)
   问题2:
   总是报slf4j的错误,就算方法体中不打印日志。
   2022-02-09 17:41:54,521 public/default/wordCount-1 WARN 
org.apache.logging.slf4j.Log4jLogger caught java.lang.NullPointerException 
logging ParameterizedMessage: [{}] [{}] add message to batch, num messages in 
batch so far {} java.lang.NullPointerException
           at 
org.apache.logging.log4j.core.config.AppenderControl.appenderErrorHandlerMessage(AppenderControl.java:113)
           at 
org.apache.logging.log4j.core.config.AppenderControl.isRecursiveCall(AppenderControl.java:105)
           at 
org.apache.logging.log4j.core.config.AppenderControl.shouldSkip(AppenderControl.java:88)
           at 
org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:81)
           at 
org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:543)
           at 
org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:502)
           at 
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:485)
           at 
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:460)
           at 
org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy.log(AwaitCompletionReliabilityStrategy.java:82)
           at org.apache.logging.log4j.core.Logger.log(Logger.java:161)
           at 
org.apache.logging.log4j.spi.AbstractLogger.tryLogMessage(AbstractLogger.java:2198)
           at 
org.apache.logging.log4j.spi.AbstractLogger.logMessageTrackRecursion(AbstractLogger.java:2152)
           at 
org.apache.logging.log4j.spi.AbstractLogger.logMessageSafely(AbstractLogger.java:2135)
           at 
org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:2022)
           at 
org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:1891)
           at org.apache.logging.slf4j.Log4jLogger.debug(Log4jLogger.java:134)
           at 
org.apache.pulsar.client.impl.BatchMessageContainerImpl.add(BatchMessageContainerImpl.java:64)
           at 
org.apache.pulsar.client.impl.ProducerImpl.serializeAndSendMessage(ProducerImpl.java:528)
           at 
org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:458)
           at 
org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:292)
           at 
org.apache.pulsar.client.impl.ProducerImpl.internalSendWithTxnAsync(ProducerImpl.java:363)
           at 
org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendWithTxnAsync(PartitionedProducerImpl.java:191)
           at 
org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendAsync(PartitionedProducerImpl.java:167)
           at 
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:103)
           at 
org.apache.pulsar.functions.instance.LogAppender.append(LogAppender.java:53)
           at 
org.apache.logging.log4j.core.config.AppenderControl.tryCallAppender(AppenderControl.java:156)
           at 
org.apache.logging.log4j.core.config.AppenderControl.callAppender0(AppenderControl.java:129)
           at 
org.apache.logging.log4j.core.config.AppenderControl.callAppenderPreventRecursion(AppenderControl.java:120)
           at 
org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:84)
           at 
org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:543)
           at 
org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:502)
           at 
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:485)
           at 
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:460)
           at 
org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy.log(AwaitCompletionReliabilityStrategy.java:82)
           at org.apache.logging.log4j.core.Logger.log(Logger.java:161)
           at 
org.apache.logging.log4j.spi.AbstractLogger.tryLogMessage(AbstractLogger.java:2198)
           at 
org.apache.logging.log4j.spi.AbstractLogger.logMessageTrackRecursion(AbstractLogger.java:2152)
           at 
org.apache.logging.log4j.spi.AbstractLogger.logMessageSafely(AbstractLogger.java:2135)
           at 
org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:2022)
           at 
org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:1891)
           at org.apache.logging.slf4j.Log4jLogger.debug(Log4jLogger.java:134)
           at 
org.apache.pulsar.client.impl.BatchMessageContainerImpl.add(BatchMessageContainerImpl.java:64)
           at 
org.apache.pulsar.client.impl.ProducerImpl.serializeAndSendMessage(ProducerImpl.java:528)
           at 
org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:458)
           at 
org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:292)
           at 
org.apache.pulsar.client.impl.ProducerImpl.internalSendWithTxnAsync(ProducerImpl.java:363)
           at 
org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendWithTxnAsync(PartitionedProducerImpl.java:191)
           at 
org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendAsync(PartitionedProducerImpl.java:167)
           at 
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:103)
           at 
org.apache.pulsar.functions.instance.LogAppender.append(LogAppender.java:53)
           at 
org.apache.logging.log4j.core.config.AppenderControl.tryCallAppender(AppenderControl.java:156)
           at 
org.apache.logging.log4j.core.config.AppenderControl.callAppender0(AppenderControl.java:129)
           at 
org.apache.logging.log4j.core.config.AppenderControl.callAppenderPreventRecursion(AppenderControl.java:120)
           at 
org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:84)
           at 
org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:543)
           at 
org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:502)
           at 
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:485)
           at 
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:460)
           at 
org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy.log(AwaitCompletionReliabilityStrategy.java:82)
           at org.apache.logging.log4j.core.Logger.log(Logger.java:161)
           at 
org.apache.logging.log4j.spi.AbstractLogger.tryLogMessage(AbstractLogger.java:2198)
           at 
org.apache.logging.log4j.spi.AbstractLogger.logMessageTrackRecursion(AbstractLogger.java:2152)
           at 
org.apache.logging.log4j.spi.AbstractLogger.logMessageSafely(AbstractLogger.java:2135)
           at 
org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:2028)
           at 
org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:1899)
           at org.apache.logging.slf4j.Log4jLogger.info(Log4jLogger.java:184)
           at 
org.example.functions.WordCountFunction.process(WordCountFunction.java:46)
           at 
org.example.functions.WordCountFunction.process(WordCountFunction.java:18)
           at 
org.apache.pulsar.functions.instance.JavaInstance.handleMessage(JavaInstance.java:95)
           at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:271)
           at java.lang.Thread.run(Thread.java:748)
   问题3:
   本地运行时能输出消息(localrun),但是正式运行时(create命令)从输出topic里收不到消息(create)。bookie里一直报错:
   09:28:51.004 [storage-scheduler-OrderedScheduler-0-0] INFO  
org.apache.bookkeeper.clients.impl.container.StorageContainerChannel - Failed 
to fetch info of storage container (0) - 'StorageContainerError : StatusCode = 
INTERNAL_SERVER_ERROR, Error = fail to fetch location for storage container 
(0)'. Retry in 200 ms ...
   09:28:51.205 [storage-scheduler-OrderedScheduler-0-0] INFO  
org.apache.bookkeeper.clients.impl.container.StorageContainerChannel - Failed 
to fetch info of storage container (0) - 'StorageContainerError : StatusCode = 
INTERNAL_SERVER_ERROR, Error = fail to fetch location for storage container 
(0)'. Retry in 200 ms ...
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to