[ 
https://issues.apache.org/jira/browse/FLINK-30616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656503#comment-17656503
 ] 

Yufan Sheng commented on FLINK-30616:
-------------------------------------

Thanks for submitting this issue. This is a known issue and has been resolved 
in https://github.com/apache/flink-connector-pulsar/pull/11. We just need to 
backport to {{pulsar-3.0}} branch. Can you close this issue for duplicated?

> Don't support batchMessageId when restore from checkpoint
> ---------------------------------------------------------
>
>                 Key: FLINK-30616
>                 URL: https://issues.apache.org/jira/browse/FLINK-30616
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Pulsar
>    Affects Versions: 1.16.0, pulsar-3.0.0
>         Environment: flink version: 1.16.0
> flink-connector-pulsar version: 1.16.0
>            Reporter: songv
>            Priority: Major
>
> I have a non-partition topic: 
>  * the producer for the topic sends batch messages to the topic(to improve 
> the speed of producers)
>  * the flink job consumes this topic by Exclusive subscription type
> When the flink task manager is restarted for some reason, an exception is 
> thrown when restored from the checkpoint:
> {code:java}
> java.lang.RuntimeException: One or more fetchers have encountered the 
> exception
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>  ~[flink-connector-files-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>  ~[flink-connector-files-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>  ~[flink-connector-files-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:106)
>  ~[?:?]
> at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:403)
>  ~[flink-dist-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387)
>  ~[flink-dist-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>  ~[flink-dist-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>  ~[flink-dist-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[flink-dist-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>  ~[flink-dist-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>  ~[flink-dist-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>  ~[flink-dist-1.16.0.jar:1.16.0]
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) 
> ~[flink-dist-1.16.0.jar:1.16.0]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) 
> ~[flink-dist-1.16.0.jar:1.16.0]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
> ~[flink-dist-1.16.0.jar:1.16.0]
> at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>  ~[flink-connector-files-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>  ~[flink-connector-files-1.16.0.jar:1.16.0]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
> ... 1 more
> Caused by: java.lang.IllegalArgumentException: We only support normal message 
> id currently. This batch size is %d [83]
> at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160) 
> ~[flink-dist-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId(MessageIdUtils.java:65)
>  ~[?:?]
> at 
> org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId(MessageIdUtils.java:44)
>  ~[?:?]
> at 
> org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader.beforeCreatingConsumer(PulsarOrderedPartitionSplitReader.java:92)
>  ~[?:?]
> at 
> org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.handleSplitsChanges(PulsarPartitionSplitReaderBase.java:171)
>  ~[?:?]
> at 
> org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader.handleSplitsChanges(PulsarOrderedPartitionSplitReader.java:51)
>  ~[?:?]
> at 
> org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
>  ~[flink-connector-files-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>  ~[flink-connector-files-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>  ~[flink-connector-files-1.16.0.jar:1.16.0]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
> ... 1 more{code}
> some important logs in the task manager:
> {code:java}
> 2023-01-09 14:51:01,645 DEBUG 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - 
> Creating operator state backend for 
> SourceOperator_cbc357ccb763df2852fee8c4fc7d55f2_(1/1) and restoring with 
> state from alternative (1/1).2023-01-09 14:51:01,664 INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
> split(s) to reader: 
> [PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]2023-01-09
>  14:51:01,740 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Starting split fetcher 02023-01-09 14:51:01,741 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run AddSplitsTask: 
> [[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
>  14:51:01,741 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Enqueued task AddSplitsTask: 
> [[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
>  14:51:01,741 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Cleaned wakeup flag.2023-01-09 14:51:01,741 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run AddSplitsTask: 
> [[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
>  14:51:01,742 DEBUG 
> org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
>  [] - Handle split changes 
> SplitAddition:[[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
>  14:51:01,742 INFO  
> org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader
>  [] - Reset subscription position by the checkpoint 
> 25551:17912:-1:82023-01-09 14:51:01,743 ERROR 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] 
> - Received uncaught exception.java.lang.RuntimeException: SplitFetcher thread 
> 0 received unexpected exception while polling the records {code}
> I don't know if it is a feature or a bug, but this means that we can't 
> restore from a batch message id checkpoint. I would like to know what to do 
> better. [~Tison] 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to