[
https://issues.apache.org/jira/browse/FLINK-35564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854929#comment-17854929
]
Yufan Sheng commented on FLINK-35564:
-------------------------------------
Hi, thanks for mention this. I think this bug has been fixed in the latest main
branch. But we may never backport to the 1.17 branch, I think you can upgrade
to the latest connector for fixing this issue.
https://github.com/apache/flink-connector-pulsar/blob/b37a8b32f30683664ff25888d403c4de414043e1/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerImpl.java#L173
> The topic cannot be distributed on subtask when calculatePartitionOwner
> returns -1
> ----------------------------------------------------------------------------------
>
> Key: FLINK-35564
> URL: https://issues.apache.org/jira/browse/FLINK-35564
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Pulsar
> Affects Versions: 1.17.2
> Reporter: 中国无锡周良
> Priority: Major
>
> The topic cannot be distributed on subtask when calculatePartitionOwner
> returns -1
> {code:java}
> @VisibleForTesting
> static int calculatePartitionOwner(String topic, int partitionId, int
> parallelism) {
> int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % parallelism;
> /*
> * Here, the assumption is that the id of Pulsar partitions are always
> ascending starting from
> * 0. Therefore, can be used directly as the offset clockwise from the
> start index.
> */
> return (startIndex + partitionId) % parallelism;
> } {code}
> Here startIndex is a non-negative number calculated based on topic.hashCode()
> and in the range [0, parallelism-1].
> For non-partitioned topic. partitionId is NON_PARTITION_ID = -1;
> but
> {code:java}
> @Override
> public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(
> List<Integer> readers) {
> if (pendingPartitionSplits.isEmpty() || readers.isEmpty()) {
> return Optional.empty();
> }
> Map<Integer, List<PulsarPartitionSplit>> assignMap =
> new HashMap<>(pendingPartitionSplits.size());
> for (Integer reader : readers) {
> Set<PulsarPartitionSplit> splits =
> pendingPartitionSplits.remove(reader);
> if (splits != null && !splits.isEmpty()) {
> assignMap.put(reader, new ArrayList<>(splits));
> }
> }
> if (assignMap.isEmpty()) {
> return Optional.empty();
> } else {
> return Optional.of(new SplitsAssignment<>(assignMap));
> }
> } {code}
> pendingPartitionSplits can't possibly have a value of -1, right? The
> calculation method of the topic by the above return 1,
> pendingPartitionSplits. Remove (reader), forever is null; This topic will not
> be assigned to a subtask; And I simulated this topic locally and found that
> messages were indeed not processed;
--
This message was sent by Atlassian Jira
(v8.20.10#820010)