[
https://issues.apache.org/jira/browse/FLINK-30616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656515#comment-17656515
]
Yufan Sheng commented on FLINK-30616:
-------------------------------------
[~songv] The [backport
PR|https://github.com/apache/flink-connector-pulsar/pull/16] has been
submitted. You can check it for local building.
> Don't support batchMessageId when restore from checkpoint
> ---------------------------------------------------------
>
> Key: FLINK-30616
> URL: https://issues.apache.org/jira/browse/FLINK-30616
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Pulsar
> Affects Versions: 1.16.0, pulsar-3.0.0
> Environment: flink version: 1.16.0
> flink-connector-pulsar version: 1.16.0
> Reporter: songv
> Priority: Major
>
> I have a non-partition topic:
> * the producer for the topic sends batch messages to the topic(to improve
> the speed of producers)
> * the flink job consumes this topic by Exclusive subscription type
> When the flink task manager is restarted for some reason, an exception is
> thrown when restored from the checkpoint:
> {code:java}
> java.lang.RuntimeException: One or more fetchers have encountered the
> exception
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
> ~[flink-connector-files-1.16.0.jar:1.16.0]
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
> ~[flink-connector-files-1.16.0.jar:1.16.0]
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
> ~[flink-connector-files-1.16.0.jar:1.16.0]
> at
> org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:106)
> ~[?:?]
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:403)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
> unexpected exception while polling the records
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
> ~[flink-connector-files-1.16.0.jar:1.16.0]
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> ~[flink-connector-files-1.16.0.jar:1.16.0]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
> ... 1 more
> Caused by: java.lang.IllegalArgumentException: We only support normal message
> id currently. This batch size is %d [83]
> at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId(MessageIdUtils.java:65)
> ~[?:?]
> at
> org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId(MessageIdUtils.java:44)
> ~[?:?]
> at
> org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader.beforeCreatingConsumer(PulsarOrderedPartitionSplitReader.java:92)
> ~[?:?]
> at
> org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.handleSplitsChanges(PulsarPartitionSplitReaderBase.java:171)
> ~[?:?]
> at
> org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader.handleSplitsChanges(PulsarOrderedPartitionSplitReader.java:51)
> ~[?:?]
> at
> org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
> ~[flink-connector-files-1.16.0.jar:1.16.0]
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> ~[flink-connector-files-1.16.0.jar:1.16.0]
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> ~[flink-connector-files-1.16.0.jar:1.16.0]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
> ... 1 more{code}
> some important logs in the task manager:
> {code:java}
> 2023-01-09 14:51:01,645 DEBUG
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] -
> Creating operator state backend for
> SourceOperator_cbc357ccb763df2852fee8c4fc7d55f2_(1/1) and restoring with
> state from alternative (1/1).2023-01-09 14:51:01,664 INFO
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding
> split(s) to reader:
> [PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]2023-01-09
> 14:51:01,740 INFO
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Starting split fetcher 02023-01-09 14:51:01,741 DEBUG
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Prepare to run AddSplitsTask:
> [[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
> 14:51:01,741 DEBUG
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Enqueued task AddSplitsTask:
> [[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
> 14:51:01,741 DEBUG
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Cleaned wakeup flag.2023-01-09 14:51:01,741 DEBUG
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Prepare to run AddSplitsTask:
> [[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
> 14:51:01,742 DEBUG
> org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
> [] - Handle split changes
> SplitAddition:[[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
> 14:51:01,742 INFO
> org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader
> [] - Reset subscription position by the checkpoint
> 25551:17912:-1:82023-01-09 14:51:01,743 ERROR
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager []
> - Received uncaught exception.java.lang.RuntimeException: SplitFetcher thread
> 0 received unexpected exception while polling the records {code}
> I don't know if it is a feature or a bug, but this means that we can't
> restore from a batch message id checkpoint. I would like to know what to do
> better. [~Tison]
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)