[
https://issues.apache.org/jira/browse/FLINK-27381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ran Tao updated FLINK-27381:
----------------------------
Description:
HybridSourceSplit's wrappedSplit has been changed from SourceSplit to
serialized wrappedSplitsBytes array. but hashcode methods did not match the
byte array. however here we should use Arrays.hashcode for serialized
wrappedSplitsBytes.
{code:java}
public int hashCode(){
return Objects.hash(wrappedSplitBytes, sourceIndex);
} {code}
{code:java}
// old
public class HybridSourceSplit implements SourceSplit {
private final SourceSplit wrappedSplit;
private final int sourceIndex;
public HybridSourceSplit(int sourceIndex, SourceSplit wrappedSplit) {
this.sourceIndex = sourceIndex;
this.wrappedSplit = wrappedSplit;
}
public int sourceIndex() {
return this.sourceIndex;
}
public SourceSplit getWrappedSplit() {
return wrappedSplit;
}
@Override
public int hashCode() {
return Objects.hash(wrappedSplit, sourceIndex);
} ...
} {code}
{code:java}
{code}
// current(master)
public class HybridSourceSplit implements SourceSplit {
private final byte[] wrappedSplitBytes;
private final int wrappedSplitSerializerVersion;
private final int sourceIndex;
private final String splitId;
public HybridSourceSplit(
int sourceIndex, byte[] wrappedSplit, int serializerVersion, String
splitId) \{ this.sourceIndex = sourceIndex;
this.wrappedSplitBytes = wrappedSplit;
this.wrappedSplitSerializerVersion = serializerVersion; this.splitId =
splitId; }
public int sourceIndex() \{ return this.sourceIndex; }
public byte[] wrappedSplitBytes() \{ return wrappedSplitBytes; }
@Override
public int hashCode() \{ return Objects.hash(wrappedSplitBytes,
sourceIndex); }
...
}
{code:java}
{code}
detail diff:
[https://github.com/apache/flink/pull/17143/files#diff-cbf6e2386d7457d9084faa23053f1f96ba5f173f5531d0e4a94205497e08df4c]
detail issue:
https://issues.apache.org/jira/browse/FLINK-24064
was:
HybridSourceSplit's wrappedSplit has been changed from SourceSplit to
serialized wrappedSplitsBytes array. but hashcode methods did not match the
byte array. however here we should use Arrays.hashcode for serialized
wrappedSplitsBytes.
public int hashCode() {
return Objects.hash(wrappedSplitBytes, sourceIndex);
}
{code:java}
// old
public class HybridSourceSplit implements SourceSplit {
private final SourceSplit wrappedSplit;
private final int sourceIndex;
public HybridSourceSplit(int sourceIndex, SourceSplit wrappedSplit) {
this.sourceIndex = sourceIndex;
this.wrappedSplit = wrappedSplit;
}
public int sourceIndex() {
return this.sourceIndex;
}
public SourceSplit getWrappedSplit() {
return wrappedSplit;
}
@Override
public int hashCode() {
return Objects.hash(wrappedSplit, sourceIndex);
} ...
} {code}
{code:java}
// current(master)
public class HybridSourceSplit implements SourceSplit {
private final byte[] wrappedSplitBytes;
private final int wrappedSplitSerializerVersion;
private final int sourceIndex;
private final String splitId;
public HybridSourceSplit(
int sourceIndex, byte[] wrappedSplit, int serializerVersion, String
splitId) {
this.sourceIndex = sourceIndex;
this.wrappedSplitBytes = wrappedSplit;
this.wrappedSplitSerializerVersion = serializerVersion;
this.splitId = splitId;
}
public int sourceIndex() {
return this.sourceIndex;
}
public byte[] wrappedSplitBytes() {
return wrappedSplitBytes;
}
@Override
public int hashCode() {
return Objects.hash(wrappedSplitBytes, sourceIndex);
}
...
} {code}
detail diff:
[https://github.com/apache/flink/pull/17143/files#diff-cbf6e2386d7457d9084faa23053f1f96ba5f173f5531d0e4a94205497e08df4c]
detail issue:
https://issues.apache.org/jira/browse/FLINK-24064
> HybridSource split should use Arrays.hashcode
> ---------------------------------------------
>
> Key: FLINK-27381
> URL: https://issues.apache.org/jira/browse/FLINK-27381
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Common
> Affects Versions: 1.14.4
> Reporter: Ran Tao
> Priority: Major
>
>
> HybridSourceSplit's wrappedSplit has been changed from SourceSplit to
> serialized wrappedSplitsBytes array. but hashcode methods did not match the
> byte array. however here we should use Arrays.hashcode for serialized
> wrappedSplitsBytes.
>
> {code:java}
> public int hashCode(){
> return Objects.hash(wrappedSplitBytes, sourceIndex);
> } {code}
>
> {code:java}
> // old
> public class HybridSourceSplit implements SourceSplit {
> private final SourceSplit wrappedSplit;
> private final int sourceIndex;
> public HybridSourceSplit(int sourceIndex, SourceSplit wrappedSplit) {
> this.sourceIndex = sourceIndex;
> this.wrappedSplit = wrappedSplit;
> }
> public int sourceIndex() {
> return this.sourceIndex;
> }
> public SourceSplit getWrappedSplit() {
> return wrappedSplit;
> }
> @Override
> public int hashCode() {
> return Objects.hash(wrappedSplit, sourceIndex);
> } ...
> } {code}
>
> {code:java}
> {code}
> // current(master)
> public class HybridSourceSplit implements SourceSplit {
> private final byte[] wrappedSplitBytes;
> private final int wrappedSplitSerializerVersion;
> private final int sourceIndex;
> private final String splitId;
> public HybridSourceSplit(
> int sourceIndex, byte[] wrappedSplit, int serializerVersion,
> String splitId) \{ this.sourceIndex = sourceIndex;
> this.wrappedSplitBytes = wrappedSplit;
> this.wrappedSplitSerializerVersion = serializerVersion; this.splitId
> = splitId; }
> public int sourceIndex() \{ return this.sourceIndex; }
> public byte[] wrappedSplitBytes() \{ return wrappedSplitBytes; }
> @Override
> public int hashCode() \{ return Objects.hash(wrappedSplitBytes,
> sourceIndex); }
> ...
> }
> {code:java}
> {code}
> detail diff:
> [https://github.com/apache/flink/pull/17143/files#diff-cbf6e2386d7457d9084faa23053f1f96ba5f173f5531d0e4a94205497e08df4c]
>
> detail issue:
> https://issues.apache.org/jira/browse/FLINK-24064
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)