[
https://issues.apache.org/jira/browse/FLINK-27529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17533590#comment-17533590
]
Ran Tao commented on FLINK-27529:
---------------------------------
[~thw] Thanks for reviewing, I have add some context for this issue and more
details at https://github.com/apache/flink/pull/19654.
> HybridSourceSplitEnumerator sourceIndex using error Integer check
> -----------------------------------------------------------------
>
> Key: FLINK-27529
> URL: https://issues.apache.org/jira/browse/FLINK-27529
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Common
> Affects Versions: 1.15.0, 1.14.4, 1.15.1
> Reporter: Ran Tao
> Assignee: Ran Tao
> Priority: Major
> Labels: pull-request-available
>
> Currently HybridSourceSplitEnumerator check readerSourceIndex using Integer
> type but == operator.
> As hybrid source definition, it can concat with more than 2 child sources. so
> currently works just because Integer cache(only works <=127), if we have more
> sources will fail on error. In a word, we can't use == to compare Integer
> index unless we limit hybrid sources only works <=127.
> e.g.
> {code:java}
> Integer i1 = 128;
> Integer i2 = 128;
> System.out.println(i1 == i2);
> int i3 = 128;
> int i4 = 128;
> System.out.println((Integer) i3 == (Integer) i4);
> {code}
> It will show false, false.
> {code:java}
> @Override
> public Map<Integer, ReaderInfo> registeredReaders() {
> // TODO: not start enumerator until readers are ready?
> Map<Integer, ReaderInfo> readers =
> realContext.registeredReaders();
> if (readers.size() != readerSourceIndex.size()) {
> return filterRegisteredReaders(readers);
> }
> Integer lastIndex = null;
> for (Integer sourceIndex : readerSourceIndex.values()) {
> if (*lastIndex != null && lastIndex != sourceIndex*) {
> return filterRegisteredReaders(readers);
> }
> lastIndex = sourceIndex;
> }
> return readers;
> }
> private Map<Integer, ReaderInfo> filterRegisteredReaders(Map<Integer,
> ReaderInfo> readers) {
> Map<Integer, ReaderInfo> readersForSource = new
> HashMap<>(readers.size());
> for (Map.Entry<Integer, ReaderInfo> e : readers.entrySet()) {
> if (*readerSourceIndex.get(e.getKey()) == (Integer)
> sourceIndex*) {
> readersForSource.put(e.getKey(), e.getValue());
> }
> }
> return readersForSource;
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)