[
https://issues.apache.org/jira/browse/FLINK-33135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jason Kania updated FLINK-33135:
--------------------------------
Description:
For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter,
ensures that the list of partitions is empty during deployment and then
complains when the list of partitions supplied to it is empty at runtime. The
default TopicRouter that is created is the RoundRobinTopicRouter and it
provides a nonsensical error for this type of TopicRouter. This error message
issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136.
The connector should not be applying a topic router to nonpartitioned topics or
should treat the nonpartitioned topic as a special/different case. Currently,
the following error is raised even though the setTopics() method is called on
the PulsarSink.builder() with a single topic.
Caused by: java.lang.IllegalArgumentException: You should provide topics for
routing topic by message key hash.
at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
at
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
at
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
... 30 more
The distinctTopics() method of the TopicNameUtils class is what ensures the
list of partitions is empty.
was:
For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter,
ensures that the list of partitions is empty during deployment and then
complains when the list of partitions supplied to it is empty at runtime. The
default TopicRouter that is created is the RoundRobinTopicRouter and it
provides a nonsensical error for this type of TopicRouter. This error message
issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136.
The connector should not be applying a topic router to nonpartitioned topics or
should treat the nonpartitioned topic as a special case. Currently, the
following error is raised even though the setTopics() method is called on the
PulsarSink.builder() with a single topic.
Caused by: java.lang.IllegalArgumentException: You should provide topics for
routing topic by message key hash.
at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
at
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
at
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
... 30 more
The distinctTopics() method of the TopicNameUtils class is what ensures the
list of partitions is empty.
> Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic
> --------------------------------------------------------------------------
>
> Key: FLINK-33135
> URL: https://issues.apache.org/jira/browse/FLINK-33135
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Pulsar
> Affects Versions: 1.17.1
> Reporter: Jason Kania
> Priority: Major
>
> For a non-partitioned topic, the Flink Pulsar connector creates a
> TopicRouter, ensures that the list of partitions is empty during deployment
> and then complains when the list of partitions supplied to it is empty at
> runtime. The default TopicRouter that is created is the RoundRobinTopicRouter
> and it provides a nonsensical error for this type of TopicRouter. This error
> message issue is raised in ticket
> https://issues.apache.org/jira/browse/FLINK-33136.
> The connector should not be applying a topic router to nonpartitioned topics
> or should treat the nonpartitioned topic as a special/different case.
> Currently, the following error is raised even though the setTopics() method
> is called on the PulsarSink.builder() with a single topic.
> Caused by: java.lang.IllegalArgumentException: You should provide topics for
> routing topic by message key hash.
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> at
> org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
> at
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ... 30 more
> The distinctTopics() method of the TopicNameUtils class is what ensures the
> list of partitions is empty.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)