[ 
https://issues.apache.org/jira/browse/FLINK-29128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598157#comment-17598157
 ] 

Leo zhang commented on FLINK-29128:
-----------------------------------

Actually, for unbounded case, we do assign the empty splits since it may 
eventually contain data. The "empty split" is only for bounded case, for 
unbounded case, this "empty case" is not considered as empty case.

Following I would show some evidence:

1、By default, if no stopping offset is set in the constructor, 
NO_STOPPING_OFFSET which is assigned as  Long.MIN_VALUE, a negative number, 
will be set.

public KafkaPartitionSplit(TopicPartition tp, long startingOffset) {
this(tp, startingOffset, NO_STOPPING_OFFSET);
}

2、In KafkaPartitionSplit#getStoppingOffset, any negative number except 
LATEST_OFFSET (-1),COMMITTED_OFFSET (-3), will be considered as no stopping 
offset, so we return Optional.empty().

public Optional<Long> getStoppingOffset() {
return stoppingOffset >= 0
|| stoppingOffset == LATEST_OFFSET // -1
|| stoppingOffset == COMMITTED_OFFSET //-3

? Optional.of(stoppingOffset)
: Optional.empty();
}

3、In KafkaPartitionSplitReader#parseStoppingOffsets, only when 
split.getStoppingOffset().ifPresent, we do parse the stopping offset.This means 
if no stopping offset is set, or the stopping offset is set to a wrong negative 
number,  no stopping offset will be parse.

private void parseStoppingOffsets(
KafkaPartitionSplit split,
List<TopicPartition> partitionsStoppingAtLatest,
Set<TopicPartition> partitionsStoppingAtCommitted) {
TopicPartition tp = split.getTopicPartition();
split.getStoppingOffset()
.ifPresent(
stoppingOffset -> {
if (stoppingOffset >= 0) {
stoppingOffsets.put(tp, stoppingOffset);
} else if (stoppingOffset == KafkaPartitionSplit.LATEST_OFFSET) {
partitionsStoppingAtLatest.add(tp);
} else if (stoppingOffset == KafkaPartitionSplit.COMMITTED_OFFSET) {
partitionsStoppingAtCommitted.add(tp);
} else {
// This should not happen.
throw new FlinkRuntimeException(
String.format(
"Invalid stopping offset %d for partition %s",
stoppingOffset, tp));
}
});
}

4、If stopping offset is LATEST_OFFSET, COMMITTED_OFFSET, we will set to actual 
offset by KafkaPartitionSplitReader#acquireAndSetStoppingOffsets.

5、In KafkaPartitionSplitReader#getStoppingOffset, the default stopping offset 
is set to Long.MAX_VALUE. This means if no stopping offset is set, the stream's 
stopping offset will be set to  Long.MAX_VALUE, just like the stream is 
unbounded, and will be run until the offset is up to  Long.MAX_VALUE.

private long getStoppingOffset(TopicPartition tp) {
return stoppingOffsets.getOrDefault(tp, Long.MAX_VALUE);
}

I make a summary here:

1、Only when stoppingOffset >=0, or equal to LATEST_OFFSET ,COMMITTED_OFFSET , a 
stopping offset will be parse and set. In this case, it's bounded.If the 
starting offset is equal to the stopping offset, an empty split is found, and 
this partition will by unassign.

2、When the stopping offset is not set, or the stopping offset is set to a wrong 
negative number, this stream is considered as unbounded, the stopping offset 
will be considered as Long.MAX_VALUE.In this case, no empty split will be found 
in unbounded mode.

> Uncatch IllegalStateException found when log split changes handling result in 
> KafkaPartitionSplitReader
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-29128
>                 URL: https://issues.apache.org/jira/browse/FLINK-29128
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.5, 1.15.2
>            Reporter: Leo zhang
>            Assignee: Leo zhang
>            Priority: Minor
>              Labels: pull-request-available
>
> When logger is set to debug mode, 
> KafkaPartitionSplitReader#maybeLogSplitChangesHandingResult log the handing 
> result of all  SplitsChange<KafkaPartitionSplit>, and the handling result 
> include the kafka partition's starting offset, which is get from kafka 
> api(consumer.position).
> When a SplitsChange<KafkaPartitionSplit> is a empty split,it will be 
> removed(unassign partition), IllegalStateException will be thrown by 
> consumer.position, since we can only check the position for partitions 
> assigned to the consumer.And this exception has not been catch, and is 
> rethrown as RuntimeExption, which lead to a failure of the application's 
> execution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to