[ 
https://issues.apache.org/jira/browse/FLINK-30616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

songv updated FLINK-30616:
--------------------------
    Description: 
I have a non-partition topic: 
 * the producer for the topic sends batch messages to the topic(to improve the 
speed of producers)
 * the flink job consumes this topic by Exclusive subscription type

When the flink task manager is restarted for some reason, an exception is 
thrown when restored from the checkpoint:
{code:java}
java.lang.RuntimeException: One or more fetchers have encountered the exception
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
 ~[flink-connector-files-1.16.0.jar:1.16.0]
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
 ~[flink-connector-files-1.16.0.jar:1.16.0]
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
 ~[flink-connector-files-1.16.0.jar:1.16.0]
at 
org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:106)
 ~[?:?]
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:403)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) 
~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
 ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) 
~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) 
~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
~[flink-dist-1.16.0.jar:1.16.0]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
 ~[flink-connector-files-1.16.0.jar:1.16.0]
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
 ~[flink-connector-files-1.16.0.jar:1.16.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
... 1 more
Caused by: java.lang.IllegalArgumentException: We only support normal message 
id currently. This batch size is %d [83]
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160) 
~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId(MessageIdUtils.java:65)
 ~[?:?]
at 
org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId(MessageIdUtils.java:44)
 ~[?:?]
at 
org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader.beforeCreatingConsumer(PulsarOrderedPartitionSplitReader.java:92)
 ~[?:?]
at 
org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.handleSplitsChanges(PulsarPartitionSplitReaderBase.java:171)
 ~[?:?]
at 
org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader.handleSplitsChanges(PulsarOrderedPartitionSplitReader.java:51)
 ~[?:?]
at 
org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
 ~[flink-connector-files-1.16.0.jar:1.16.0]
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
 ~[flink-connector-files-1.16.0.jar:1.16.0]
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
 ~[flink-connector-files-1.16.0.jar:1.16.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
... 1 more{code}
some importance logs in the task manager:

 
{code:java}
2023-01-09 14:51:01,645 DEBUG 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Creating 
operator state backend for 
SourceOperator_cbc357ccb763df2852fee8c4fc7d55f2_(1/1) and restoring with state 
from alternative (1/1).2023-01-09 14:51:01,664 INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
split(s) to reader: 
[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]2023-01-09
 14:51:01,740 INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Starting split fetcher 02023-01-09 14:51:01,741 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run AddSplitsTask: 
[[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
 14:51:01,741 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Enqueued task AddSplitsTask: 
[[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
 14:51:01,741 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Cleaned 
wakeup flag.2023-01-09 14:51:01,741 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run AddSplitsTask: 
[[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
 14:51:01,742 DEBUG 
org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
 [] - Handle split changes 
SplitAddition:[[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
 14:51:01,742 INFO  
org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader
 [] - Reset subscription position by the checkpoint 25551:17912:-1:82023-01-09 
14:51:01,743 ERROR 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - 
Received uncaught exception.java.lang.RuntimeException: SplitFetcher thread 0 
received unexpected exception while polling the records {code}
I don't know if it is a feature or a bug, but this means that we can't restore 
from a batch message id checkpoint. I would like to know what to do better. 
[~Tison] 

 

  was:
I have a non-partition topic: 
 * the producer for the topic sends batch messages to the topic(to improve the 
speed of producers)
 * the flink job consumes this topic by Exclusive subscription type

When the flink task manager is restarted for some reason, an exception is 
thrown when restored from the checkpoint:
{code:java}
java.lang.RuntimeException: One or more fetchers have encountered the exception
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
 ~[flink-connector-files-1.16.0.jar:1.16.0]
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
 ~[flink-connector-files-1.16.0.jar:1.16.0]
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
 ~[flink-connector-files-1.16.0.jar:1.16.0]
at 
org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:106)
 ~[?:?]
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:403)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) 
~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
 ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) 
~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) 
~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
~[flink-dist-1.16.0.jar:1.16.0]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
 ~[flink-connector-files-1.16.0.jar:1.16.0]
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
 ~[flink-connector-files-1.16.0.jar:1.16.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
... 1 more
Caused by: java.lang.IllegalArgumentException: We only support normal message 
id currently. This batch size is %d [83]
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160) 
~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId(MessageIdUtils.java:65)
 ~[?:?]
at 
org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId(MessageIdUtils.java:44)
 ~[?:?]
at 
org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader.beforeCreatingConsumer(PulsarOrderedPartitionSplitReader.java:92)
 ~[?:?]
at 
org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.handleSplitsChanges(PulsarPartitionSplitReaderBase.java:171)
 ~[?:?]
at 
org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader.handleSplitsChanges(PulsarOrderedPartitionSplitReader.java:51)
 ~[?:?]
at 
org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
 ~[flink-connector-files-1.16.0.jar:1.16.0]
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
 ~[flink-connector-files-1.16.0.jar:1.16.0]
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
 ~[flink-connector-files-1.16.0.jar:1.16.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
... 1 more{code}

some importance logs in the task manager:

 
{code:java}
2023-01-09 14:51:01,645 DEBUG 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Creating 
operator state backend for 
SourceOperator_cbc357ccb763df2852fee8c4fc7d55f2_(1/1) and restoring with state 
from alternative (1/1).2023-01-09 14:51:01,664 INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
split(s) to reader: 
[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]2023-01-09
 14:51:01,740 INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Starting split fetcher 02023-01-09 14:51:01,741 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run AddSplitsTask: 
[[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
 14:51:01,741 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Enqueued task AddSplitsTask: 
[[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
 14:51:01,741 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Cleaned 
wakeup flag.2023-01-09 14:51:01,741 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run AddSplitsTask: 
[[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
 14:51:01,742 DEBUG 
org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
 [] - Handle split changes 
SplitAddition:[[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
 14:51:01,742 INFO  
org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader
 [] - Reset subscription position by the checkpoint 25551:17912:-1:82023-01-09 
14:51:01,743 ERROR 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - 
Received uncaught exception.java.lang.RuntimeException: SplitFetcher thread 0 
received unexpected exception while polling the records {code}
I don't know if it is a feature or a bug, but this means that we can't restore 
from a batch message id checkpoint. I would like to know what to do better. 

 


> Don't support batchMessageId when restore from checkpoint
> ---------------------------------------------------------
>
>                 Key: FLINK-30616
>                 URL: https://issues.apache.org/jira/browse/FLINK-30616
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Pulsar
>    Affects Versions: pulsar-3.0.0
>         Environment: flink version: 1.16.0
> flink-connector-pulsar version: 1.16.0
>            Reporter: songv
>            Priority: Major
>
> I have a non-partition topic: 
>  * the producer for the topic sends batch messages to the topic(to improve 
> the speed of producers)
>  * the flink job consumes this topic by Exclusive subscription type
> When the flink task manager is restarted for some reason, an exception is 
> thrown when restored from the checkpoint:
> {code:java}
> java.lang.RuntimeException: One or more fetchers have encountered the 
> exception
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>  ~[flink-connector-files-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>  ~[flink-connector-files-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>  ~[flink-connector-files-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:106)
>  ~[?:?]
> at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:403)
>  ~[flink-dist-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387)
>  ~[flink-dist-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>  ~[flink-dist-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>  ~[flink-dist-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[flink-dist-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>  ~[flink-dist-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>  ~[flink-dist-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>  ~[flink-dist-1.16.0.jar:1.16.0]
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) 
> ~[flink-dist-1.16.0.jar:1.16.0]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) 
> ~[flink-dist-1.16.0.jar:1.16.0]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
> ~[flink-dist-1.16.0.jar:1.16.0]
> at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>  ~[flink-connector-files-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>  ~[flink-connector-files-1.16.0.jar:1.16.0]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
> ... 1 more
> Caused by: java.lang.IllegalArgumentException: We only support normal message 
> id currently. This batch size is %d [83]
> at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160) 
> ~[flink-dist-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId(MessageIdUtils.java:65)
>  ~[?:?]
> at 
> org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId(MessageIdUtils.java:44)
>  ~[?:?]
> at 
> org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader.beforeCreatingConsumer(PulsarOrderedPartitionSplitReader.java:92)
>  ~[?:?]
> at 
> org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.handleSplitsChanges(PulsarPartitionSplitReaderBase.java:171)
>  ~[?:?]
> at 
> org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader.handleSplitsChanges(PulsarOrderedPartitionSplitReader.java:51)
>  ~[?:?]
> at 
> org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
>  ~[flink-connector-files-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>  ~[flink-connector-files-1.16.0.jar:1.16.0]
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>  ~[flink-connector-files-1.16.0.jar:1.16.0]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
> ... 1 more{code}
> some importance logs in the task manager:
>  
> {code:java}
> 2023-01-09 14:51:01,645 DEBUG 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - 
> Creating operator state backend for 
> SourceOperator_cbc357ccb763df2852fee8c4fc7d55f2_(1/1) and restoring with 
> state from alternative (1/1).2023-01-09 14:51:01,664 INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
> split(s) to reader: 
> [PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]2023-01-09
>  14:51:01,740 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Starting split fetcher 02023-01-09 14:51:01,741 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run AddSplitsTask: 
> [[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
>  14:51:01,741 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Enqueued task AddSplitsTask: 
> [[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
>  14:51:01,741 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Cleaned wakeup flag.2023-01-09 14:51:01,741 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run AddSplitsTask: 
> [[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
>  14:51:01,742 DEBUG 
> org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
>  [] - Handle split changes 
> SplitAddition:[[PulsarPartitionSplit{partition=persistent://ethereum-prod/raw/transactions|[0-65535]}]]2023-01-09
>  14:51:01,742 INFO  
> org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader
>  [] - Reset subscription position by the checkpoint 
> 25551:17912:-1:82023-01-09 14:51:01,743 ERROR 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] 
> - Received uncaught exception.java.lang.RuntimeException: SplitFetcher thread 
> 0 received unexpected exception while polling the records {code}
> I don't know if it is a feature or a bug, but this means that we can't 
> restore from a batch message id checkpoint. I would like to know what to do 
> better. [~Tison] 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to