[ 
https://issues.apache.org/jira/browse/FLINK-27381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ran Tao updated FLINK-27381:
----------------------------
    Description: 
 

HybridSourceSplit's wrappedSplit  has been changed from SourceSplit to 
serialized wrappedSplitsBytes array. but hashcode methods did not match the 
byte array. however here we should use Arrays.hashcode for serialized 
wrappedSplitsBytes.

 
public int hashCode() {
        return Objects.hash(wrappedSplitBytes, sourceIndex);
    }
 
{code:java}
// old
public class HybridSourceSplit implements SourceSplit {   
    private final SourceSplit wrappedSplit;
    private final int sourceIndex; 
    public HybridSourceSplit(int sourceIndex, SourceSplit wrappedSplit) {
        this.sourceIndex = sourceIndex;
        this.wrappedSplit = wrappedSplit;
    }
    public int sourceIndex() {
        return this.sourceIndex;
    }
    public SourceSplit getWrappedSplit() {
        return wrappedSplit;
    }
    @Override
    public int hashCode() {
        return Objects.hash(wrappedSplit, sourceIndex);
    }    ...
}     {code}
 
{code:java}
// current(master)
public class HybridSourceSplit implements SourceSplit {
    private final byte[] wrappedSplitBytes;
    private final int wrappedSplitSerializerVersion;
    private final int sourceIndex;
    private final String splitId;
    public HybridSourceSplit(
            int sourceIndex, byte[] wrappedSplit, int serializerVersion, String 
splitId) {
        this.sourceIndex = sourceIndex;
        this.wrappedSplitBytes = wrappedSplit;
        this.wrappedSplitSerializerVersion = serializerVersion;
        this.splitId = splitId;
    }
    public int sourceIndex() {
        return this.sourceIndex;
    }
    public byte[] wrappedSplitBytes() {
        return wrappedSplitBytes;
    }
    @Override
    public int hashCode() {
        return Objects.hash(wrappedSplitBytes, sourceIndex);
    }
    ...
 }    {code}
detail diff: 

[https://github.com/apache/flink/pull/17143/files#diff-cbf6e2386d7457d9084faa23053f1f96ba5f173f5531d0e4a94205497e08df4c]

 

detail issue:

https://issues.apache.org/jira/browse/FLINK-24064

 

  was:
 

HybridSourceSplit's wrappedSplit  has been changed from SourceSplit to 
serialized wrappedSplitsBytes array. but hashcode methods did not match the 
byte array. however here we should use Arrays.hashcode for serialized 
wrappedSplitsBytes.

 
{code:java}
// old
public class HybridSourceSplit implements SourceSplit {   
    private final SourceSplit wrappedSplit;
    private final int sourceIndex; 
    public HybridSourceSplit(int sourceIndex, SourceSplit wrappedSplit) {
        this.sourceIndex = sourceIndex;
        this.wrappedSplit = wrappedSplit;
    }
    public int sourceIndex() {
        return this.sourceIndex;
    }
    public SourceSplit getWrappedSplit() {
        return wrappedSplit;
    }
    @Override
    public int hashCode() {
        return Objects.hash(wrappedSplit, sourceIndex);
    }    ...
}     {code}
 
{code:java}
// current(master)
public class HybridSourceSplit implements SourceSplit {
    private final byte[] wrappedSplitBytes;
    private final int wrappedSplitSerializerVersion;
    private final int sourceIndex;
    private final String splitId;
    public HybridSourceSplit(
            int sourceIndex, byte[] wrappedSplit, int serializerVersion, String 
splitId) {
        this.sourceIndex = sourceIndex;
        this.wrappedSplitBytes = wrappedSplit;
        this.wrappedSplitSerializerVersion = serializerVersion;
        this.splitId = splitId;
    }
    public int sourceIndex() {
        return this.sourceIndex;
    }
    public byte[] wrappedSplitBytes() {
        return wrappedSplitBytes;
    }
    @Override
    public int hashCode() {
        return Objects.hash(wrappedSplitBytes, sourceIndex);
    }
    ...
 }    {code}
detail diff: 

[https://github.com/apache/flink/pull/17143/files#diff-cbf6e2386d7457d9084faa23053f1f96ba5f173f5531d0e4a94205497e08df4c]

 

detail issue:

https://issues.apache.org/jira/browse/FLINK-24064

 


> HybridSource split should use Arrays.hashcode
> ---------------------------------------------
>
>                 Key: FLINK-27381
>                 URL: https://issues.apache.org/jira/browse/FLINK-27381
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Common
>    Affects Versions: 1.14.4
>            Reporter: Ran Tao
>            Priority: Major
>
>  
> HybridSourceSplit's wrappedSplit  has been changed from SourceSplit to 
> serialized wrappedSplitsBytes array. but hashcode methods did not match the 
> byte array. however here we should use Arrays.hashcode for serialized 
> wrappedSplitsBytes.
>  
> public int hashCode() {
>         return Objects.hash(wrappedSplitBytes, sourceIndex);
>     }
>  
> {code:java}
> // old
> public class HybridSourceSplit implements SourceSplit {   
>     private final SourceSplit wrappedSplit;
>     private final int sourceIndex; 
>     public HybridSourceSplit(int sourceIndex, SourceSplit wrappedSplit) {
>         this.sourceIndex = sourceIndex;
>         this.wrappedSplit = wrappedSplit;
>     }
>     public int sourceIndex() {
>         return this.sourceIndex;
>     }
>     public SourceSplit getWrappedSplit() {
>         return wrappedSplit;
>     }
>     @Override
>     public int hashCode() {
>         return Objects.hash(wrappedSplit, sourceIndex);
>     }    ...
> }     {code}
>  
> {code:java}
> // current(master)
> public class HybridSourceSplit implements SourceSplit {
>     private final byte[] wrappedSplitBytes;
>     private final int wrappedSplitSerializerVersion;
>     private final int sourceIndex;
>     private final String splitId;
>     public HybridSourceSplit(
>             int sourceIndex, byte[] wrappedSplit, int serializerVersion, 
> String splitId) {
>         this.sourceIndex = sourceIndex;
>         this.wrappedSplitBytes = wrappedSplit;
>         this.wrappedSplitSerializerVersion = serializerVersion;
>         this.splitId = splitId;
>     }
>     public int sourceIndex() {
>         return this.sourceIndex;
>     }
>     public byte[] wrappedSplitBytes() {
>         return wrappedSplitBytes;
>     }
>     @Override
>     public int hashCode() {
>         return Objects.hash(wrappedSplitBytes, sourceIndex);
>     }
>     ...
>  }    {code}
> detail diff: 
> [https://github.com/apache/flink/pull/17143/files#diff-cbf6e2386d7457d9084faa23053f1f96ba5f173f5531d0e4a94205497e08df4c]
>  
> detail issue:
> https://issues.apache.org/jira/browse/FLINK-24064
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to