[
https://issues.apache.org/jira/browse/FLINK-29128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598157#comment-17598157
]
Leo zhang commented on FLINK-29128:
-----------------------------------
Actually, for unbounded case, we do assign the empty splits since it may
eventually contain data. The "empty split" is only for bounded case, for
unbounded case, this "empty case" is not considered as empty case.
Following I would show some evidence:
1、By default, if no stopping offset is set in the constructor,
NO_STOPPING_OFFSET which is assigned as Long.MIN_VALUE, a negative number,
will be set.
public KafkaPartitionSplit(TopicPartition tp, long startingOffset) {
this(tp, startingOffset, NO_STOPPING_OFFSET);
}
2、In KafkaPartitionSplit#getStoppingOffset, any negative number except
LATEST_OFFSET (-1),COMMITTED_OFFSET (-3), will be considered as no stopping
offset, so we return Optional.empty().
public Optional<Long> getStoppingOffset() {
return stoppingOffset >= 0
|| stoppingOffset == LATEST_OFFSET // -1
|| stoppingOffset == COMMITTED_OFFSET //-3
? Optional.of(stoppingOffset)
: Optional.empty();
}
3、In KafkaPartitionSplitReader#parseStoppingOffsets, only when
split.getStoppingOffset().ifPresent, we do parse the stopping offset.This means
if no stopping offset is set, or the stopping offset is set to a wrong negative
number, no stopping offset will be parse.
private void parseStoppingOffsets(
KafkaPartitionSplit split,
List<TopicPartition> partitionsStoppingAtLatest,
Set<TopicPartition> partitionsStoppingAtCommitted) {
TopicPartition tp = split.getTopicPartition();
split.getStoppingOffset()
.ifPresent(
stoppingOffset -> {
if (stoppingOffset >= 0) {
stoppingOffsets.put(tp, stoppingOffset);
} else if (stoppingOffset == KafkaPartitionSplit.LATEST_OFFSET) {
partitionsStoppingAtLatest.add(tp);
} else if (stoppingOffset == KafkaPartitionSplit.COMMITTED_OFFSET) {
partitionsStoppingAtCommitted.add(tp);
} else {
// This should not happen.
throw new FlinkRuntimeException(
String.format(
"Invalid stopping offset %d for partition %s",
stoppingOffset, tp));
}
});
}
4、If stopping offset is LATEST_OFFSET, COMMITTED_OFFSET, we will set to actual
offset by KafkaPartitionSplitReader#acquireAndSetStoppingOffsets.
5、In KafkaPartitionSplitReader#getStoppingOffset, the default stopping offset
is set to Long.MAX_VALUE. This means if no stopping offset is set, the stream's
stopping offset will be set to Long.MAX_VALUE, just like the stream is
unbounded, and will be run until the offset is up to Long.MAX_VALUE.
private long getStoppingOffset(TopicPartition tp) {
return stoppingOffsets.getOrDefault(tp, Long.MAX_VALUE);
}
I make a summary here:
1、Only when stoppingOffset >=0, or equal to LATEST_OFFSET ,COMMITTED_OFFSET , a
stopping offset will be parse and set. In this case, it's bounded.If the
starting offset is equal to the stopping offset, an empty split is found, and
this partition will by unassign.
2、When the stopping offset is not set, or the stopping offset is set to a wrong
negative number, this stream is considered as unbounded, the stopping offset
will be considered as Long.MAX_VALUE.In this case, no empty split will be found
in unbounded mode.
> Uncatch IllegalStateException found when log split changes handling result in
> KafkaPartitionSplitReader
> -------------------------------------------------------------------------------------------------------
>
> Key: FLINK-29128
> URL: https://issues.apache.org/jira/browse/FLINK-29128
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.14.5, 1.15.2
> Reporter: Leo zhang
> Assignee: Leo zhang
> Priority: Minor
> Labels: pull-request-available
>
> When logger is set to debug mode,
> KafkaPartitionSplitReader#maybeLogSplitChangesHandingResult log the handing
> result of all SplitsChange<KafkaPartitionSplit>, and the handling result
> include the kafka partition's starting offset, which is get from kafka
> api(consumer.position).
> When a SplitsChange<KafkaPartitionSplit> is a empty split,it will be
> removed(unassign partition), IllegalStateException will be thrown by
> consumer.position, since we can only check the position for partitions
> assigned to the consumer.And this exception has not been catch, and is
> rethrown as RuntimeExption, which lead to a failure of the application's
> execution.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)