[
https://issues.apache.org/jira/browse/FLINK-27381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ran Tao updated FLINK-27381:
----------------------------
Description:
HybridSourceSplit's wrappedSplit has been changed from SourceSplit to
serialized wrappedSplitsBytes array. but hashcode methods did not match the
byte array. however here we should use Arrays.hashcode for serialized
wrappedSplitsBytes.
{code:java}
public int hashCode(){
return Objects.hash(wrappedSplitBytes, sourceIndex);
} {code}
{code:java}
// old
public class HybridSourceSplit implements SourceSplit {
private final SourceSplit wrappedSplit;
private final int sourceIndex;
public HybridSourceSplit(int sourceIndex, SourceSplit wrappedSplit) {
this.sourceIndex = sourceIndex;
this.wrappedSplit = wrappedSplit;
}
public int sourceIndex() {
return this.sourceIndex;
}
public SourceSplit getWrappedSplit() {
return wrappedSplit;
}
@Override
public int hashCode() {
return Objects.hash(wrappedSplit, sourceIndex);
} ...
} {code}
{code:java}
// current(master branch)
public class HybridSourceSplit implements SourceSplit {
private final byte[] wrappedSplitBytes;
private final int wrappedSplitSerializerVersion;
private final int sourceIndex;
private final String splitId;
public HybridSourceSplit(
int sourceIndex, byte[] wrappedSplit, int serializerVersion, String
splitId) {
this.sourceIndex = sourceIndex;
this.wrappedSplitBytes = wrappedSplit;
this.wrappedSplitSerializerVersion = serializerVersion;
this.splitId = splitId;
}
public int sourceIndex() {
return this.sourceIndex;
}
public byte[] wrappedSplitBytes() {
return wrappedSplitBytes;
}
@Override
public int hashCode() {
return Objects.hash(wrappedSplitBytes, sourceIndex);
}
...
} {code}
detail diff:
[https://github.com/apache/flink/pull/17143/files#diff-cbf6e2386d7457d9084faa23053f1f96ba5f173f5531d0e4a94205497e08df4c]
detail issue:
https://issues.apache.org/jira/browse/FLINK-24064
was:
HybridSourceSplit's wrappedSplit has been changed from SourceSplit to
serialized wrappedSplitsBytes array. but hashcode methods did not match the
byte array. however here we should use Arrays.hashcode for serialized
wrappedSplitsBytes.
{code:java}
public int hashCode(){
return Objects.hash(wrappedSplitBytes, sourceIndex);
} {code}
{code:java}
// old
public class HybridSourceSplit implements SourceSplit {
private final SourceSplit wrappedSplit;
private final int sourceIndex;
public HybridSourceSplit(int sourceIndex, SourceSplit wrappedSplit) {
this.sourceIndex = sourceIndex;
this.wrappedSplit = wrappedSplit;
}
public int sourceIndex() {
return this.sourceIndex;
}
public SourceSplit getWrappedSplit() {
return wrappedSplit;
}
@Override
public int hashCode() {
return Objects.hash(wrappedSplit, sourceIndex);
} ...
} {code}
{code:java}
{code}
// current(master)
public class HybridSourceSplit implements SourceSplit {
private final byte[] wrappedSplitBytes;
private final int wrappedSplitSerializerVersion;
private final int sourceIndex;
private final String splitId;
public HybridSourceSplit(
int sourceIndex, byte[] wrappedSplit, int serializerVersion, String
splitId) \{ this.sourceIndex = sourceIndex;
this.wrappedSplitBytes = wrappedSplit;
this.wrappedSplitSerializerVersion = serializerVersion; this.splitId =
splitId; }
public int sourceIndex() \{ return this.sourceIndex; }
public byte[] wrappedSplitBytes() \{ return wrappedSplitBytes; }
@Override
public int hashCode() \{ return Objects.hash(wrappedSplitBytes,
sourceIndex); }
...
}
{code:java}
{code}
detail diff:
[https://github.com/apache/flink/pull/17143/files#diff-cbf6e2386d7457d9084faa23053f1f96ba5f173f5531d0e4a94205497e08df4c]
detail issue:
https://issues.apache.org/jira/browse/FLINK-24064
> HybridSource split should use Arrays.hashcode
> ---------------------------------------------
>
> Key: FLINK-27381
> URL: https://issues.apache.org/jira/browse/FLINK-27381
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Common
> Affects Versions: 1.14.4
> Reporter: Ran Tao
> Priority: Major
>
>
> HybridSourceSplit's wrappedSplit has been changed from SourceSplit to
> serialized wrappedSplitsBytes array. but hashcode methods did not match the
> byte array. however here we should use Arrays.hashcode for serialized
> wrappedSplitsBytes.
>
> {code:java}
> public int hashCode(){
> return Objects.hash(wrappedSplitBytes, sourceIndex);
> } {code}
>
> {code:java}
> // old
> public class HybridSourceSplit implements SourceSplit {
> private final SourceSplit wrappedSplit;
> private final int sourceIndex;
> public HybridSourceSplit(int sourceIndex, SourceSplit wrappedSplit) {
> this.sourceIndex = sourceIndex;
> this.wrappedSplit = wrappedSplit;
> }
> public int sourceIndex() {
> return this.sourceIndex;
> }
> public SourceSplit getWrappedSplit() {
> return wrappedSplit;
> }
> @Override
> public int hashCode() {
> return Objects.hash(wrappedSplit, sourceIndex);
> } ...
> } {code}
>
> {code:java}
> // current(master branch)
> public class HybridSourceSplit implements SourceSplit {
> private final byte[] wrappedSplitBytes;
> private final int wrappedSplitSerializerVersion;
> private final int sourceIndex;
> private final String splitId;
> public HybridSourceSplit(
> int sourceIndex, byte[] wrappedSplit, int serializerVersion,
> String splitId) {
> this.sourceIndex = sourceIndex;
> this.wrappedSplitBytes = wrappedSplit;
> this.wrappedSplitSerializerVersion = serializerVersion;
> this.splitId = splitId;
> }
> public int sourceIndex() {
> return this.sourceIndex;
> }
> public byte[] wrappedSplitBytes() {
> return wrappedSplitBytes;
> }
> @Override
> public int hashCode() {
> return Objects.hash(wrappedSplitBytes, sourceIndex);
> }
> ...
> } {code}
> detail diff:
> [https://github.com/apache/flink/pull/17143/files#diff-cbf6e2386d7457d9084faa23053f1f96ba5f173f5531d0e4a94205497e08df4c]
>
> detail issue:
> https://issues.apache.org/jira/browse/FLINK-24064
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)