songv created FLINK-30616:
-----------------------------
Summary: Don't support batchMessageId when restore from checkpoint
Key: FLINK-30616
URL: https://issues.apache.org/jira/browse/FLINK-30616
Project: Flink
Issue Type: Bug
Components: Connectors / Pulsar
Affects Versions: pulsar-3.0.0
Environment: flink version: 1.16.0
flink-connector-pulsar version: 1.16.0
Reporter: songv
I have a non-partition topic:
* the producer for the topic sends batch messages to the topic(to improve the
speed of producers)
* the flink job consumes this topic by Exclusive subscription type
When the flink task manager is restarted for some reason, an exception is
thrown when restored from the checkpoint:
java.lang.RuntimeException: One or more fetchers have encountered the exception
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
~[flink-connector-files-1.16.0.jar:1.16.0]
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
~[flink-connector-files-1.16.0.jar:1.16.0]
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
~[flink-connector-files-1.16.0.jar:1.16.0]
at
org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:106)
~[?:?]
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:403)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
~[flink-dist-1.16.0.jar:1.16.0]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
unexpected exception while polling the records
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
~[flink-connector-files-1.16.0.jar:1.16.0]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
~[flink-connector-files-1.16.0.jar:1.16.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
... 1 more
Caused by: java.lang.IllegalArgumentException: We only support normal message
id currently. This batch size is %d [83]
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId(MessageIdUtils.java:65)
~[?:?]
at
org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId(MessageIdUtils.java:44)
~[?:?]
at
org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader.beforeCreatingConsumer(PulsarOrderedPartitionSplitReader.java:92)
~[?:?]
at
org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.handleSplitsChanges(PulsarPartitionSplitReaderBase.java:171)
~[?:?]
at
org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader.handleSplitsChanges(PulsarOrderedPartitionSplitReader.java:51)
~[?:?]
at
org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
~[flink-connector-files-1.16.0.jar:1.16.0]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
~[flink-connector-files-1.16.0.jar:1.16.0]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
~[flink-connector-files-1.16.0.jar:1.16.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
... 1 more
some importance logs in the task manager:
{code:java}
2023-01-09 14:51:01,645 DEBUG
org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Creating
operator state backend for
SourceOperator_cbc357ccb763df2852fee8c4fc7d55f2_(1/1) and restoring with state
from alternative (1/1).2023-01-09 14:51:01,664 INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding
split(s) to reader:
[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]2023-01-09
14:51:01,740 INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Starting split fetcher 02023-01-09 14:51:01,741 DEBUG
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare
to run AddSplitsTask:
[[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
14:51:01,741 DEBUG
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Enqueued task AddSplitsTask:
[[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
14:51:01,741 DEBUG
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Cleaned
wakeup flag.2023-01-09 14:51:01,741 DEBUG
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare
to run AddSplitsTask:
[[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
14:51:01,742 DEBUG
org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
[] - Handle split changes
SplitAddition:[[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
14:51:01,742 INFO
org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader
[] - Reset subscription position by the checkpoint 25551:17912:-1:82023-01-09
14:51:01,743 ERROR
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] -
Received uncaught exception.java.lang.RuntimeException: SplitFetcher thread 0
received unexpected exception while polling the records {code}
I don't know if it is a feature or a bug, but this means that we can't restore
from a batch message id checkpoint. I would like to know what to do better.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)