[ https://issues.apache.org/jira/browse/FLINK-27193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Akshay Hazari updated FLINK-27193: ---------------------------------- Description: We have a unit test to check if Kyro serialisation and deserialisation results in the same value but it fails The KyroSerializer and Deserializer is used like this {code:java} import kotlin.reflect.KClass import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.core.memory.DataInputDeserializer import org.apache.flink.core.memory.DataOutputSerializer class KryoSerializerExtension { fun <T : Any> serde(t: T): T { val bytes = serialize(t) return deserialize(bytes, t::class) } fun serialize(any: Any): ByteArray { val config = ExecutionConfig() config.registerKryoType(any.javaClass) val serializer = KryoSerializer(any.javaClass, config) val output = DataOutputSerializer(1) serializer.serialize(any, output) return output.sharedBuffer } fun <T : Any> deserialize(bytes: ByteArray, kClass: KClass<T>): T { val config = ExecutionConfig() config.registerKryoType(kClass.java) val serializer = KryoSerializer(kClass.java, config) val input = DataInputDeserializer(bytes) return serializer.deserialize(input) } } {code} The Unit test simply looks like this {code:java} @Test fun fieldRecord() { val record = getFieldRecord() val result = kryo.serde(record) assertThat(result).isEqualTo(record) }{code} This is the actual vs expected assertion error. The record is huge all the components hash result in a different value. I am not sure hot the record is modified. {code:java} org.opentest4j.AssertionFailedError: expected: "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE: Flavor : SPARSE LgK : 10 Merge Flag : false Error Const : 0.5887050112577373 RSE : 0.01839703160180429 Seed Hash : 93cc | 37836 Num Coupons : 2 Num Pairs (SV) : 2 First Inter Col: 0 Valid Window : false Valid PairTable: true Window Offset : 0 KxP : 1023.375 HIP Accum : 2.00012208521548 ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: {test=1}, numericalRangeHistogramCalculator= ### Quantiles HeapUpdateDoublesSketch SUMMARY: Empty : false Direct, Capacity bytes : false, Estimation Mode : false K : 128 N : 2 Levels (Needed, Total, Valid): 0, 0, 0 Level Bit Pattern : 0 BaseBufferCount : 2 Combined Buffer Capacity : 4 Retained Items : 2 Compact Storage Bytes : 48 Updatable Storage Bytes : 64 Normalized Rank Error : 1.406% Normalized Rank Error (PMF) : 1.711% Min Value : 1.000000e+00 Max Value : 3.000000e+00 ### END SKETCH SUMMARY , numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics: n: 1 min: 3.0 max: 3.0 sum: 3.0 mean: 3.0 geometric mean: 3.0000000000000004 variance: 0.0 population variance: 0.0 second moment: 0.0 sum of squares: 9.0 standard deviation: 0.0 sum of logs: 1.0986122886681098 ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)" but was : "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE: Flavor : SPARSE LgK : 10 Merge Flag : false Error Const : 0.5887050112577373 RSE : 0.01839703160180429 Seed Hash : 93cc | 37836 Num Coupons : 2 Num Pairs (SV) : 2 First Inter Col: 0 Valid Window : false Valid PairTable: true Window Offset : 0 KxP : 1023.375 HIP Accum : 2.00012208521548 ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: {test=1}, numericalRangeHistogramCalculator= ### Quantiles HeapUpdateDoublesSketch SUMMARY: Empty : false Direct, Capacity bytes : false, Estimation Mode : false K : 128 N : 2 Levels (Needed, Total, Valid): 0, 0, 0 Level Bit Pattern : 0 BaseBufferCount : 2 Combined Buffer Capacity : 4 Retained Items : 2 Compact Storage Bytes : 48 Updatable Storage Bytes : 64 Normalized Rank Error : 1.406% Normalized Rank Error (PMF) : 1.711% Min Value : 1.000000e+00 Max Value : 3.000000e+00 ### END SKETCH SUMMARY , numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics: n: 1 min: 3.0 max: 3.0 sum: 3.0 mean: 3.0 geometric mean: 3.0000000000000004 variance: 0.0 population variance: 0.0 second moment: 0.0 sum of squares: 9.0 standard deviation: 0.0 sum of logs: 1.0986122886681098 ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@769a58e5)" {code} Whether there is any issue with the way we are serialising deserialising this ? Any help is appreciated was: We have a unit test to check if Kyro serialisation and deserialisation results in the same value but it fails The KyroSerializer and Deserializer is used like this {code:java} import kotlin.reflect.KClass import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.core.memory.DataInputDeserializer import org.apache.flink.core.memory.DataOutputSerializer class KryoSerializerExtension { fun <T : Any> serde(t: T): T { val bytes = serialize(t) return deserialize(bytes, t::class) } fun serialize(any: Any): ByteArray { val config = ExecutionConfig() config.registerKryoType(any.javaClass) val serializer = KryoSerializer(any.javaClass, config) val output = DataOutputSerializer(1) serializer.serialize(any, output) return output.sharedBuffer } fun <T : Any> deserialize(bytes: ByteArray, kClass: KClass<T>): T { val config = ExecutionConfig() config.registerKryoType(kClass.java) val serializer = KryoSerializer(kClass.java, config) val input = DataInputDeserializer(bytes) return serializer.deserialize(input) } } {code} The Unit test simply looks like this {code:java} @Test fun fieldRecord() { val record = getFieldRecord() val result = kryo.serde(record) assertThat(result).isEqualTo(record) }{code} This is the actual vs expected assertion error. The record is huge all the components hash result in a different value. I am not sure what kyro does which is modifying the record. {code:java} org.opentest4j.AssertionFailedError: expected: "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE: Flavor : SPARSE LgK : 10 Merge Flag : false Error Const : 0.5887050112577373 RSE : 0.01839703160180429 Seed Hash : 93cc | 37836 Num Coupons : 2 Num Pairs (SV) : 2 First Inter Col: 0 Valid Window : false Valid PairTable: true Window Offset : 0 KxP : 1023.375 HIP Accum : 2.00012208521548 ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: {test=1}, numericalRangeHistogramCalculator= ### Quantiles HeapUpdateDoublesSketch SUMMARY: Empty : false Direct, Capacity bytes : false, Estimation Mode : false K : 128 N : 2 Levels (Needed, Total, Valid): 0, 0, 0 Level Bit Pattern : 0 BaseBufferCount : 2 Combined Buffer Capacity : 4 Retained Items : 2 Compact Storage Bytes : 48 Updatable Storage Bytes : 64 Normalized Rank Error : 1.406% Normalized Rank Error (PMF) : 1.711% Min Value : 1.000000e+00 Max Value : 3.000000e+00 ### END SKETCH SUMMARY , numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics: n: 1 min: 3.0 max: 3.0 sum: 3.0 mean: 3.0 geometric mean: 3.0000000000000004 variance: 0.0 population variance: 0.0 second moment: 0.0 sum of squares: 9.0 standard deviation: 0.0 sum of logs: 1.0986122886681098 ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)" but was : "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE: Flavor : SPARSE LgK : 10 Merge Flag : false Error Const : 0.5887050112577373 RSE : 0.01839703160180429 Seed Hash : 93cc | 37836 Num Coupons : 2 Num Pairs (SV) : 2 First Inter Col: 0 Valid Window : false Valid PairTable: true Window Offset : 0 KxP : 1023.375 HIP Accum : 2.00012208521548 ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: {test=1}, numericalRangeHistogramCalculator= ### Quantiles HeapUpdateDoublesSketch SUMMARY: Empty : false Direct, Capacity bytes : false, Estimation Mode : false K : 128 N : 2 Levels (Needed, Total, Valid): 0, 0, 0 Level Bit Pattern : 0 BaseBufferCount : 2 Combined Buffer Capacity : 4 Retained Items : 2 Compact Storage Bytes : 48 Updatable Storage Bytes : 64 Normalized Rank Error : 1.406% Normalized Rank Error (PMF) : 1.711% Min Value : 1.000000e+00 Max Value : 3.000000e+00 ### END SKETCH SUMMARY , numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics: n: 1 min: 3.0 max: 3.0 sum: 3.0 mean: 3.0 geometric mean: 3.0000000000000004 variance: 0.0 population variance: 0.0 second moment: 0.0 sum of squares: 9.0 standard deviation: 0.0 sum of logs: 1.0986122886681098 ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@769a58e5)" {code} Whether there is any issue with the way we are serialising deserialising this ? Any help is appreciated > Kyro Serialisation and Deserialisation returns a different object > ------------------------------------------------------------------ > > Key: FLINK-27193 > URL: https://issues.apache.org/jira/browse/FLINK-27193 > Project: Flink > Issue Type: Bug > Reporter: Akshay Hazari > Priority: Minor > > We have a unit test to check if Kyro serialisation and deserialisation > results in the same value but it fails > The KyroSerializer and Deserializer is used like this > {code:java} > import kotlin.reflect.KClass > import org.apache.flink.api.common.ExecutionConfig > import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer > import org.apache.flink.core.memory.DataInputDeserializer > import org.apache.flink.core.memory.DataOutputSerializer > class KryoSerializerExtension { > fun <T : Any> serde(t: T): T > { val bytes = serialize(t) return deserialize(bytes, t::class) } > fun serialize(any: Any): ByteArray { > val config = ExecutionConfig() > config.registerKryoType(any.javaClass) > val serializer = KryoSerializer(any.javaClass, config) > val output = DataOutputSerializer(1) > serializer.serialize(any, output) > return output.sharedBuffer > } > fun <T : Any> deserialize(bytes: ByteArray, kClass: KClass<T>): T { > val config = ExecutionConfig() > config.registerKryoType(kClass.java) > val serializer = KryoSerializer(kClass.java, config) > val input = DataInputDeserializer(bytes) > return serializer.deserialize(input) > } > } > {code} > > The Unit test simply looks like this > {code:java} > @Test > fun fieldRecord() { > val record = getFieldRecord() > val result = kryo.serde(record) > assertThat(result).isEqualTo(record) > }{code} > This is the actual vs expected assertion error. > The record is huge all the components hash result in a different value. I am > not sure hot the record is modified. > {code:java} > org.opentest4j.AssertionFailedError: > expected: "FieldRecord(name=foo, type=string, > fieldCounter=FieldCounter(populated=1, missing=3), > countDistinctCalculator=### CPD SKETCH - PREAMBLE: > Flavor : SPARSE > LgK : 10 > Merge Flag : false > Error Const : 0.5887050112577373 > RSE : 0.01839703160180429 > Seed Hash : 93cc | 37836 > Num Coupons : 2 > Num Pairs (SV) : 2 > First Inter Col: 0 > Valid Window : false > Valid PairTable: true > Window Offset : 0 > KxP : 1023.375 > HIP Accum : 2.00012208521548 > ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, > frequencies: {test=1}, numericalRangeHistogramCalculator= > ### Quantiles HeapUpdateDoublesSketch SUMMARY: > Empty : false > Direct, Capacity bytes : false, > Estimation Mode : false > K : 128 > N : 2 > Levels (Needed, Total, Valid): 0, 0, 0 > Level Bit Pattern : 0 > BaseBufferCount : 2 > Combined Buffer Capacity : 4 > Retained Items : 2 > Compact Storage Bytes : 48 > Updatable Storage Bytes : 64 > Normalized Rank Error : 1.406% > Normalized Rank Error (PMF) : 1.711% > Min Value : 1.000000e+00 > Max Value : 3.000000e+00 > ### END SKETCH SUMMARY > , > numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics: > n: 1 > min: 3.0 > max: 3.0 > sum: 3.0 > mean: 3.0 > geometric mean: 3.0000000000000004 > variance: 0.0 > population variance: 0.0 > second moment: 0.0 > sum of squares: 9.0 > standard deviation: 0.0 > sum of logs: 1.0986122886681098 > ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, > mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)" > but was : "FieldRecord(name=foo, type=string, > fieldCounter=FieldCounter(populated=1, missing=3), > countDistinctCalculator=### CPD SKETCH - PREAMBLE: > Flavor : SPARSE > LgK : 10 > Merge Flag : false > Error Const : 0.5887050112577373 > RSE : 0.01839703160180429 > Seed Hash : 93cc | 37836 > Num Coupons : 2 > Num Pairs (SV) : 2 > First Inter Col: 0 > Valid Window : false > Valid PairTable: true > Window Offset : 0 > KxP : 1023.375 > HIP Accum : 2.00012208521548 > ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, > frequencies: {test=1}, numericalRangeHistogramCalculator= > ### Quantiles HeapUpdateDoublesSketch SUMMARY: > Empty : false > Direct, Capacity bytes : false, > Estimation Mode : false > K : 128 > N : 2 > Levels (Needed, Total, Valid): 0, 0, 0 > Level Bit Pattern : 0 > BaseBufferCount : 2 > Combined Buffer Capacity : 4 > Retained Items : 2 > Compact Storage Bytes : 48 > Updatable Storage Bytes : 64 > Normalized Rank Error : 1.406% > Normalized Rank Error (PMF) : 1.711% > Min Value : 1.000000e+00 > Max Value : 3.000000e+00 > ### END SKETCH SUMMARY > , > numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics: > n: 1 > min: 3.0 > max: 3.0 > sum: 3.0 > mean: 3.0 > geometric mean: 3.0000000000000004 > variance: 0.0 > population variance: 0.0 > second moment: 0.0 > sum of squares: 9.0 > standard deviation: 0.0 > sum of logs: 1.0986122886681098 > ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, > mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@769a58e5)" > > {code} > > Whether there is any issue with the way we are serialising deserialising this > ? > Any help is appreciated -- This message was sent by Atlassian Jira (v8.20.1#820001)