[ 
https://issues.apache.org/jira/browse/FLINK-27193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Akshay Hazari updated FLINK-27193:
----------------------------------
    Description: 
We have a unit test to check if Kyro serialisation and deserialisation results 
in the same value but it fails

The KyroSerializer and Deserializer is used like this
{code:java}
import kotlin.reflect.KClass
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.core.memory.DataInputDeserializer
import org.apache.flink.core.memory.DataOutputSerializer
class KryoSerializerExtension {
    fun <T : Any> serde(t: T): T
    { val bytes = serialize(t) return deserialize(bytes, t::class) }
    fun serialize(any: Any): ByteArray {
    val config = ExecutionConfig()
    config.registerKryoType(any.javaClass)
    val serializer = KryoSerializer(any.javaClass, config)
    val output = DataOutputSerializer(1)
    serializer.serialize(any, output)
    return output.sharedBuffer
}
fun <T : Any> deserialize(bytes: ByteArray, kClass: KClass<T>): T {
    val config = ExecutionConfig()
    config.registerKryoType(kClass.java)
    val serializer = KryoSerializer(kClass.java, config)
    val input = DataInputDeserializer(bytes)
    return serializer.deserialize(input)
    }
}
{code}
 

The Unit test simply looks like this
{code:java}
@Test
fun fieldRecord() {
    val record = getFieldRecord()
    val result = kryo.serde(record)
    assertThat(result).isEqualTo(record)
}{code}
This is the actual vs expected assertion error.

The record is huge all the components hash result in a different value. I am 
not sure hot the record is modified.
{code:java}
org.opentest4j.AssertionFailedError: 
expected: "FieldRecord(name=foo, type=string, 
fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### 
CPD SKETCH - PREAMBLE:
  Flavor         : SPARSE
  LgK            : 10
  Merge Flag     : false
  Error Const    : 0.5887050112577373
  RSE            : 0.01839703160180429
  Seed Hash      : 93cc | 37836
  Num Coupons    : 2
  Num Pairs (SV) : 2
  First Inter Col: 0
  Valid Window   : false
  Valid PairTable: true
  Window Offset  : 0
  KxP            : 1023.375
  HIP Accum      : 2.00012208521548
### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, 
frequencies: {test=1}, numericalRangeHistogramCalculator=
### Quantiles HeapUpdateDoublesSketch SUMMARY: 
   Empty                        : false
   Direct, Capacity bytes       : false, 
   Estimation Mode              : false
   K                            : 128
   N                            : 2
   Levels (Needed, Total, Valid): 0, 0, 0
   Level Bit Pattern            : 0
   BaseBufferCount              : 2
   Combined Buffer Capacity     : 4
   Retained Items               : 2
   Compact Storage Bytes        : 48
   Updatable Storage Bytes      : 64
   Normalized Rank Error        : 1.406%
   Normalized Rank Error (PMF)  : 1.711%
   Min Value                    : 1.000000e+00
   Max Value                    : 3.000000e+00
### END SKETCH SUMMARY
, 
numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
n: 1
min: 3.0
max: 3.0
sum: 3.0
mean: 3.0
geometric mean: 3.0000000000000004
variance: 0.0
population variance: 0.0
second moment: 0.0
sum of squares: 9.0
standard deviation: 0.0
sum of logs: 1.0986122886681098
]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, 
mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)"
but was : "FieldRecord(name=foo, type=string, 
fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### 
CPD SKETCH - PREAMBLE:
  Flavor         : SPARSE
  LgK            : 10
  Merge Flag     : false
  Error Const    : 0.5887050112577373
  RSE            : 0.01839703160180429
  Seed Hash      : 93cc | 37836
  Num Coupons    : 2
  Num Pairs (SV) : 2
  First Inter Col: 0
  Valid Window   : false
  Valid PairTable: true
  Window Offset  : 0
  KxP            : 1023.375
  HIP Accum      : 2.00012208521548
### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, 
frequencies: {test=1}, numericalRangeHistogramCalculator=
### Quantiles HeapUpdateDoublesSketch SUMMARY: 
   Empty                        : false
   Direct, Capacity bytes       : false, 
   Estimation Mode              : false
   K                            : 128
   N                            : 2
   Levels (Needed, Total, Valid): 0, 0, 0
   Level Bit Pattern            : 0
   BaseBufferCount              : 2
   Combined Buffer Capacity     : 4
   Retained Items               : 2
   Compact Storage Bytes        : 48
   Updatable Storage Bytes      : 64
   Normalized Rank Error        : 1.406%
   Normalized Rank Error (PMF)  : 1.711%
   Min Value                    : 1.000000e+00
   Max Value                    : 3.000000e+00
### END SKETCH SUMMARY
, 
numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
n: 1
min: 3.0
max: 3.0
sum: 3.0
mean: 3.0
geometric mean: 3.0000000000000004
variance: 0.0
population variance: 0.0
second moment: 0.0
sum of squares: 9.0
standard deviation: 0.0
sum of logs: 1.0986122886681098
]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, 
mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@769a58e5)"
  
{code}
 

Whether there is any issue with the way we are serialising deserialising this ? 
 

Any help is appreciated

  was:
We have a unit test to check if Kyro serialisation and deserialisation results 
in the same value but it fails

The KyroSerializer and Deserializer is used like this
{code:java}
import kotlin.reflect.KClass
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.core.memory.DataInputDeserializer
import org.apache.flink.core.memory.DataOutputSerializer
class KryoSerializerExtension {
    fun <T : Any> serde(t: T): T
    { val bytes = serialize(t) return deserialize(bytes, t::class) }
    fun serialize(any: Any): ByteArray {
    val config = ExecutionConfig()
    config.registerKryoType(any.javaClass)
    val serializer = KryoSerializer(any.javaClass, config)
    val output = DataOutputSerializer(1)
    serializer.serialize(any, output)
    return output.sharedBuffer
}
fun <T : Any> deserialize(bytes: ByteArray, kClass: KClass<T>): T {
    val config = ExecutionConfig()
    config.registerKryoType(kClass.java)
    val serializer = KryoSerializer(kClass.java, config)
    val input = DataInputDeserializer(bytes)
    return serializer.deserialize(input)
    }
}
{code}

 

The Unit test simply looks like this
{code:java}
@Test
fun fieldRecord() {
    val record = getFieldRecord()
    val result = kryo.serde(record)
    assertThat(result).isEqualTo(record)
}{code}

This is the actual vs expected assertion error.

The record is huge all the components hash result in a different value. I am 
not sure what kyro does which is modifying the record.
{code:java}
org.opentest4j.AssertionFailedError: 
expected: "FieldRecord(name=foo, type=string, 
fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### 
CPD SKETCH - PREAMBLE:
  Flavor         : SPARSE
  LgK            : 10
  Merge Flag     : false
  Error Const    : 0.5887050112577373
  RSE            : 0.01839703160180429
  Seed Hash      : 93cc | 37836
  Num Coupons    : 2
  Num Pairs (SV) : 2
  First Inter Col: 0
  Valid Window   : false
  Valid PairTable: true
  Window Offset  : 0
  KxP            : 1023.375
  HIP Accum      : 2.00012208521548
### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, 
frequencies: {test=1}, numericalRangeHistogramCalculator=
### Quantiles HeapUpdateDoublesSketch SUMMARY: 
   Empty                        : false
   Direct, Capacity bytes       : false, 
   Estimation Mode              : false
   K                            : 128
   N                            : 2
   Levels (Needed, Total, Valid): 0, 0, 0
   Level Bit Pattern            : 0
   BaseBufferCount              : 2
   Combined Buffer Capacity     : 4
   Retained Items               : 2
   Compact Storage Bytes        : 48
   Updatable Storage Bytes      : 64
   Normalized Rank Error        : 1.406%
   Normalized Rank Error (PMF)  : 1.711%
   Min Value                    : 1.000000e+00
   Max Value                    : 3.000000e+00
### END SKETCH SUMMARY
, 
numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
n: 1
min: 3.0
max: 3.0
sum: 3.0
mean: 3.0
geometric mean: 3.0000000000000004
variance: 0.0
population variance: 0.0
second moment: 0.0
sum of squares: 9.0
standard deviation: 0.0
sum of logs: 1.0986122886681098
]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, 
mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)"
but was : "FieldRecord(name=foo, type=string, 
fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### 
CPD SKETCH - PREAMBLE:
  Flavor         : SPARSE
  LgK            : 10
  Merge Flag     : false
  Error Const    : 0.5887050112577373
  RSE            : 0.01839703160180429
  Seed Hash      : 93cc | 37836
  Num Coupons    : 2
  Num Pairs (SV) : 2
  First Inter Col: 0
  Valid Window   : false
  Valid PairTable: true
  Window Offset  : 0
  KxP            : 1023.375
  HIP Accum      : 2.00012208521548
### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, 
frequencies: {test=1}, numericalRangeHistogramCalculator=
### Quantiles HeapUpdateDoublesSketch SUMMARY: 
   Empty                        : false
   Direct, Capacity bytes       : false, 
   Estimation Mode              : false
   K                            : 128
   N                            : 2
   Levels (Needed, Total, Valid): 0, 0, 0
   Level Bit Pattern            : 0
   BaseBufferCount              : 2
   Combined Buffer Capacity     : 4
   Retained Items               : 2
   Compact Storage Bytes        : 48
   Updatable Storage Bytes      : 64
   Normalized Rank Error        : 1.406%
   Normalized Rank Error (PMF)  : 1.711%
   Min Value                    : 1.000000e+00
   Max Value                    : 3.000000e+00
### END SKETCH SUMMARY
, 
numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
n: 1
min: 3.0
max: 3.0
sum: 3.0
mean: 3.0
geometric mean: 3.0000000000000004
variance: 0.0
population variance: 0.0
second moment: 0.0
sum of squares: 9.0
standard deviation: 0.0
sum of logs: 1.0986122886681098
]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, 
mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@769a58e5)"
  
{code}
 

Whether there is any issue with the way we are serialising deserialising this ? 
 

Any help is appreciated


> Kyro Serialisation and Deserialisation returns a different object 
> ------------------------------------------------------------------
>
>                 Key: FLINK-27193
>                 URL: https://issues.apache.org/jira/browse/FLINK-27193
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Akshay Hazari
>            Priority: Minor
>
> We have a unit test to check if Kyro serialisation and deserialisation 
> results in the same value but it fails
> The KyroSerializer and Deserializer is used like this
> {code:java}
> import kotlin.reflect.KClass
> import org.apache.flink.api.common.ExecutionConfig
> import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> import org.apache.flink.core.memory.DataInputDeserializer
> import org.apache.flink.core.memory.DataOutputSerializer
> class KryoSerializerExtension {
>     fun <T : Any> serde(t: T): T
>     { val bytes = serialize(t) return deserialize(bytes, t::class) }
>     fun serialize(any: Any): ByteArray {
>     val config = ExecutionConfig()
>     config.registerKryoType(any.javaClass)
>     val serializer = KryoSerializer(any.javaClass, config)
>     val output = DataOutputSerializer(1)
>     serializer.serialize(any, output)
>     return output.sharedBuffer
> }
> fun <T : Any> deserialize(bytes: ByteArray, kClass: KClass<T>): T {
>     val config = ExecutionConfig()
>     config.registerKryoType(kClass.java)
>     val serializer = KryoSerializer(kClass.java, config)
>     val input = DataInputDeserializer(bytes)
>     return serializer.deserialize(input)
>     }
> }
> {code}
>  
> The Unit test simply looks like this
> {code:java}
> @Test
> fun fieldRecord() {
>     val record = getFieldRecord()
>     val result = kryo.serde(record)
>     assertThat(result).isEqualTo(record)
> }{code}
> This is the actual vs expected assertion error.
> The record is huge all the components hash result in a different value. I am 
> not sure hot the record is modified.
> {code:java}
> org.opentest4j.AssertionFailedError: 
> expected: "FieldRecord(name=foo, type=string, 
> fieldCounter=FieldCounter(populated=1, missing=3), 
> countDistinctCalculator=### CPD SKETCH - PREAMBLE:
>   Flavor         : SPARSE
>   LgK            : 10
>   Merge Flag     : false
>   Error Const    : 0.5887050112577373
>   RSE            : 0.01839703160180429
>   Seed Hash      : 93cc | 37836
>   Num Coupons    : 2
>   Num Pairs (SV) : 2
>   First Inter Col: 0
>   Valid Window   : false
>   Valid PairTable: true
>   Window Offset  : 0
>   KxP            : 1023.375
>   HIP Accum      : 2.00012208521548
> ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, 
> frequencies: {test=1}, numericalRangeHistogramCalculator=
> ### Quantiles HeapUpdateDoublesSketch SUMMARY: 
>    Empty                        : false
>    Direct, Capacity bytes       : false, 
>    Estimation Mode              : false
>    K                            : 128
>    N                            : 2
>    Levels (Needed, Total, Valid): 0, 0, 0
>    Level Bit Pattern            : 0
>    BaseBufferCount              : 2
>    Combined Buffer Capacity     : 4
>    Retained Items               : 2
>    Compact Storage Bytes        : 48
>    Updatable Storage Bytes      : 64
>    Normalized Rank Error        : 1.406%
>    Normalized Rank Error (PMF)  : 1.711%
>    Min Value                    : 1.000000e+00
>    Max Value                    : 3.000000e+00
> ### END SKETCH SUMMARY
> , 
> numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
> n: 1
> min: 3.0
> max: 3.0
> sum: 3.0
> mean: 3.0
> geometric mean: 3.0000000000000004
> variance: 0.0
> population variance: 0.0
> second moment: 0.0
> sum of squares: 9.0
> standard deviation: 0.0
> sum of logs: 1.0986122886681098
> ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, 
> mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)"
> but was : "FieldRecord(name=foo, type=string, 
> fieldCounter=FieldCounter(populated=1, missing=3), 
> countDistinctCalculator=### CPD SKETCH - PREAMBLE:
>   Flavor         : SPARSE
>   LgK            : 10
>   Merge Flag     : false
>   Error Const    : 0.5887050112577373
>   RSE            : 0.01839703160180429
>   Seed Hash      : 93cc | 37836
>   Num Coupons    : 2
>   Num Pairs (SV) : 2
>   First Inter Col: 0
>   Valid Window   : false
>   Valid PairTable: true
>   Window Offset  : 0
>   KxP            : 1023.375
>   HIP Accum      : 2.00012208521548
> ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, 
> frequencies: {test=1}, numericalRangeHistogramCalculator=
> ### Quantiles HeapUpdateDoublesSketch SUMMARY: 
>    Empty                        : false
>    Direct, Capacity bytes       : false, 
>    Estimation Mode              : false
>    K                            : 128
>    N                            : 2
>    Levels (Needed, Total, Valid): 0, 0, 0
>    Level Bit Pattern            : 0
>    BaseBufferCount              : 2
>    Combined Buffer Capacity     : 4
>    Retained Items               : 2
>    Compact Storage Bytes        : 48
>    Updatable Storage Bytes      : 64
>    Normalized Rank Error        : 1.406%
>    Normalized Rank Error (PMF)  : 1.711%
>    Min Value                    : 1.000000e+00
>    Max Value                    : 3.000000e+00
> ### END SKETCH SUMMARY
> , 
> numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
> n: 1
> min: 3.0
> max: 3.0
> sum: 3.0
> mean: 3.0
> geometric mean: 3.0000000000000004
> variance: 0.0
> population variance: 0.0
> second moment: 0.0
> sum of squares: 9.0
> standard deviation: 0.0
> sum of logs: 1.0986122886681098
> ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, 
> mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@769a58e5)"
>   
> {code}
>  
> Whether there is any issue with the way we are serialising deserialising this 
> ?  
> Any help is appreciated



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to