[ https://issues.apache.org/jira/browse/FLINK-27529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ran Tao updated FLINK-27529: ---------------------------- Description: Currently HybridSourceSplitEnumerator check readerSourceIndex using Integer type but == operator. As hybrid source definition, it can concat with more than 2 child sources. so currently works just because Integer cache(only works <=127), if we have more sources will fail on error. In a word, we can't use == to compare Integer index unless we limit hybrid sources only works <=127. e.g. {code:java} Integer i1 = 128; Integer i2 = 128; System.out.println(i1 == i2); int i3 = 128; int i4 = 128; System.out.println((Integer) i3 == (Integer) i4); {code} It will show false, false. HybridSource Integer index comparison is below: {code:java} @Override public Map<Integer, ReaderInfo> registeredReaders() { .... Integer lastIndex = null; for (Integer sourceIndex : readerSourceIndex.values()) { if (lastIndex != null && lastIndex != sourceIndex) { return filterRegisteredReaders(readers); } lastIndex = sourceIndex; } return readers; } private Map<Integer, ReaderInfo> filterRegisteredReaders(Map<Integer, ReaderInfo> readers) { Map<Integer, ReaderInfo> readersForSource = new HashMap<>(readers.size()); for (Map.Entry<Integer, ReaderInfo> e : readers.entrySet()) { if (readerSourceIndex.get(e.getKey()) == (Integer) sourceIndex) { readersForSource.put(e.getKey(), e.getValue()); } } return readersForSource; } {code} was: Currently HybridSourceSplitEnumerator check readerSourceIndex using Integer type but == operator. As hybrid source definition, it can concat with more than 2 child sources. so currently works just because Integer cache(only works <=127), if we have more sources will fail on error. In a word, we can't use == to compare Integer index unless we limit hybrid sources only works <=127. e.g. {code:java} Integer i1 = 128; Integer i2 = 128; System.out.println(i1 == i2); int i3 = 128; int i4 = 128; System.out.println((Integer) i3 == (Integer) i4); {code} It will show false, false. {code:java} @Override public Map<Integer, ReaderInfo> registeredReaders() { // TODO: not start enumerator until readers are ready? Map<Integer, ReaderInfo> readers = realContext.registeredReaders(); if (readers.size() != readerSourceIndex.size()) { return filterRegisteredReaders(readers); } Integer lastIndex = null; for (Integer sourceIndex : readerSourceIndex.values()) { if (*lastIndex != null && lastIndex != sourceIndex*) { return filterRegisteredReaders(readers); } lastIndex = sourceIndex; } return readers; } private Map<Integer, ReaderInfo> filterRegisteredReaders(Map<Integer, ReaderInfo> readers) { Map<Integer, ReaderInfo> readersForSource = new HashMap<>(readers.size()); for (Map.Entry<Integer, ReaderInfo> e : readers.entrySet()) { if (*readerSourceIndex.get(e.getKey()) == (Integer) sourceIndex*) { readersForSource.put(e.getKey(), e.getValue()); } } return readersForSource; } {code} > HybridSourceSplitEnumerator sourceIndex using error Integer check > ----------------------------------------------------------------- > > Key: FLINK-27529 > URL: https://issues.apache.org/jira/browse/FLINK-27529 > Project: Flink > Issue Type: Bug > Components: Connectors / Common > Affects Versions: 1.15.0, 1.14.4, 1.15.1 > Reporter: Ran Tao > Assignee: Ran Tao > Priority: Major > Labels: pull-request-available > > Currently HybridSourceSplitEnumerator check readerSourceIndex using Integer > type but == operator. > As hybrid source definition, it can concat with more than 2 child sources. so > currently works just because Integer cache(only works <=127), if we have more > sources will fail on error. In a word, we can't use == to compare Integer > index unless we limit hybrid sources only works <=127. > e.g. > {code:java} > Integer i1 = 128; > Integer i2 = 128; > System.out.println(i1 == i2); > int i3 = 128; > int i4 = 128; > System.out.println((Integer) i3 == (Integer) i4); > {code} > It will show false, false. > HybridSource Integer index comparison is below: > {code:java} > @Override > public Map<Integer, ReaderInfo> registeredReaders() { > .... > Integer lastIndex = null; > for (Integer sourceIndex : readerSourceIndex.values()) { > if (lastIndex != null && lastIndex != sourceIndex) { > return filterRegisteredReaders(readers); > } > lastIndex = sourceIndex; > } > return readers; > } > private Map<Integer, ReaderInfo> filterRegisteredReaders(Map<Integer, > ReaderInfo> readers) { > Map<Integer, ReaderInfo> readersForSource = new > HashMap<>(readers.size()); > for (Map.Entry<Integer, ReaderInfo> e : readers.entrySet()) { > if (readerSourceIndex.get(e.getKey()) == (Integer) > sourceIndex) { > readersForSource.put(e.getKey(), e.getValue()); > } > } > return readersForSource; > } > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)