[ 
https://issues.apache.org/jira/browse/FLINK-27529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ran Tao updated FLINK-27529:
----------------------------
    Description: 
Currently HybridSourceSplitEnumerator check readerSourceIndex using Integer 
type but == operator. 


As hybrid source definition, it can concat with more than 2 child sources. so 
currently works just because Integer cache(only works <=127), if we have more 
sources will fail on error. In a word, we can't use == to compare Integer index 
unless we limit hybrid sources only works <=127.

e.g.
{code:java}
        Integer i1 = 128;
        Integer i2 = 128;
        System.out.println(i1 == i2);
        int i3 = 128;
        int i4 = 128;
        System.out.println((Integer) i3 == (Integer) i4);
{code}

It will show false, false.

HybridSource Integer index comparison is below:
{code:java}
@Override
        public Map<Integer, ReaderInfo> registeredReaders() {
            ....
            Integer lastIndex = null;
            for (Integer sourceIndex : readerSourceIndex.values()) {
                if (lastIndex != null && lastIndex != sourceIndex) {
                    return filterRegisteredReaders(readers);
                }
                lastIndex = sourceIndex;
            }
            return readers;
        }

        private Map<Integer, ReaderInfo> filterRegisteredReaders(Map<Integer, 
ReaderInfo> readers) {
            Map<Integer, ReaderInfo> readersForSource = new 
HashMap<>(readers.size());
            for (Map.Entry<Integer, ReaderInfo> e : readers.entrySet()) {
                if (readerSourceIndex.get(e.getKey()) == (Integer) sourceIndex) 
{
                    readersForSource.put(e.getKey(), e.getValue());
                }
            }
            return readersForSource;
        }
{code}


  was:
Currently HybridSourceSplitEnumerator check readerSourceIndex using Integer 
type but == operator. 


As hybrid source definition, it can concat with more than 2 child sources. so 
currently works just because Integer cache(only works <=127), if we have more 
sources will fail on error. In a word, we can't use == to compare Integer index 
unless we limit hybrid sources only works <=127.

e.g.
{code:java}
        Integer i1 = 128;
        Integer i2 = 128;
        System.out.println(i1 == i2);
        int i3 = 128;
        int i4 = 128;
        System.out.println((Integer) i3 == (Integer) i4);
{code}

It will show false, false.

{code:java}
@Override
        public Map<Integer, ReaderInfo> registeredReaders() {
            // TODO: not start enumerator until readers are ready?
            Map<Integer, ReaderInfo> readers = realContext.registeredReaders();
            if (readers.size() != readerSourceIndex.size()) {
                return filterRegisteredReaders(readers);
            }
            Integer lastIndex = null;
            for (Integer sourceIndex : readerSourceIndex.values()) {
                if (*lastIndex != null && lastIndex != sourceIndex*) {
                    return filterRegisteredReaders(readers);
                }
                lastIndex = sourceIndex;
            }
            return readers;
        }

        private Map<Integer, ReaderInfo> filterRegisteredReaders(Map<Integer, 
ReaderInfo> readers) {
            Map<Integer, ReaderInfo> readersForSource = new 
HashMap<>(readers.size());
            for (Map.Entry<Integer, ReaderInfo> e : readers.entrySet()) {
                if (*readerSourceIndex.get(e.getKey()) == (Integer) 
sourceIndex*) {
                    readersForSource.put(e.getKey(), e.getValue());
                }
            }
            return readersForSource;
        }
{code}



> HybridSourceSplitEnumerator sourceIndex using error Integer check
> -----------------------------------------------------------------
>
>                 Key: FLINK-27529
>                 URL: https://issues.apache.org/jira/browse/FLINK-27529
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.15.0, 1.14.4, 1.15.1
>            Reporter: Ran Tao
>            Assignee: Ran Tao
>            Priority: Major
>              Labels: pull-request-available
>
> Currently HybridSourceSplitEnumerator check readerSourceIndex using Integer 
> type but == operator. 
> As hybrid source definition, it can concat with more than 2 child sources. so 
> currently works just because Integer cache(only works <=127), if we have more 
> sources will fail on error. In a word, we can't use == to compare Integer 
> index unless we limit hybrid sources only works <=127.
> e.g.
> {code:java}
>         Integer i1 = 128;
>         Integer i2 = 128;
>         System.out.println(i1 == i2);
>         int i3 = 128;
>         int i4 = 128;
>         System.out.println((Integer) i3 == (Integer) i4);
> {code}
> It will show false, false.
> HybridSource Integer index comparison is below:
> {code:java}
> @Override
>         public Map<Integer, ReaderInfo> registeredReaders() {
>             ....
>             Integer lastIndex = null;
>             for (Integer sourceIndex : readerSourceIndex.values()) {
>                 if (lastIndex != null && lastIndex != sourceIndex) {
>                     return filterRegisteredReaders(readers);
>                 }
>                 lastIndex = sourceIndex;
>             }
>             return readers;
>         }
>         private Map<Integer, ReaderInfo> filterRegisteredReaders(Map<Integer, 
> ReaderInfo> readers) {
>             Map<Integer, ReaderInfo> readersForSource = new 
> HashMap<>(readers.size());
>             for (Map.Entry<Integer, ReaderInfo> e : readers.entrySet()) {
>                 if (readerSourceIndex.get(e.getKey()) == (Integer) 
> sourceIndex) {
>                     readersForSource.put(e.getKey(), e.getValue());
>                 }
>             }
>             return readersForSource;
>         }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to