Yuhao Bi created FLINK-25481:
--------------------------------

             Summary: SourceIndex comparison in SplitEnumeratorContextProxy
                 Key: FLINK-25481
                 URL: https://issues.apache.org/jira/browse/FLINK-25481
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.14.2, 1.13.5, 1.15.0
            Reporter: Yuhao Bi


In 
[HybridSourceSplitEnumerator.java|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java]

the sourceIndex is used by value, but in the following block, it's compared by 
reference address after boxing
{code:java}
@Override
public Map<Integer, ReaderInfo> registeredReaders() {
    // TODO: not start enumerator until readers are ready?
    Map<Integer, ReaderInfo> readers = realContext.registeredReaders();
    if (readers.size() != readerSourceIndex.size()) {
        return filterRegisteredReaders(readers);
    }
    Integer lastIndex = null;
    for (Integer sourceIndex : readerSourceIndex.values()) {

// Integer reference variable compared by '==' operator
        if (lastIndex != null && lastIndex != sourceIndex) {
            return filterRegisteredReaders(readers);
        }
        lastIndex = sourceIndex;
    }
    return readers;
}

private Map<Integer, ReaderInfo> filterRegisteredReaders(Map<Integer, 
ReaderInfo> readers) {
    Map<Integer, ReaderInfo> readersForSource = new HashMap<>(readers.size());
    for (Map.Entry<Integer, ReaderInfo> e : readers.entrySet()) {
// sourceIndex cast to Integer then compared by '==' operator
        if (readerSourceIndex.get(e.getKey()) == (Integer) sourceIndex) {
            readersForSource.put(e.getKey(), e.getValue());
        }
    }
    return readersForSource;
}
 {code}
Java will cache Integer in the range between -128 to +127 so the code works, 
but if my understanding is correct it might be better replaced by .equals 
method call.

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to