[
https://issues.apache.org/jira/browse/FLINK-13702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16963894#comment-16963894
]
Jark Wu edited comment on FLINK-13702 at 10/31/19 11:30 AM:
------------------------------------------------------------
I thought about call {{duplicate}} too, but calling {{duplicate}} for every
BinaryGeneric {{materialize()}} is a performance penalty (most of the cases the
serializer is KryoSerializer).
Another idea is exposing {{ensureMaterialized(TypeSerializer<T>
javaObjectSer)}} to BinaryGeneric, it will use the serializer parameter to
materialize. And in {{BinaryGenericSerializer}}, we should call
{{generic.ensureMaterialized(ser)}} before access segments of it. This can make
sure one thread holds a serializer, and don't need to duplicate for every
records.
Some pseudo code:
{code:java}
// BinaryGeneric
public void ensureMaterialized(TypeSerializer<T> serializer) {
if (binary == null) {
binary = materialize(serializer);
}
}
public Binary materialize(TypeSerializer<T> serializer) {
byte[] bytes = InstantiationUtil.serializeToByteArray(serializer,
javaObject);
return new Binary(new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, 0,
bytes.length);
}
@Override
public Binary materialize() {
return materialize(javaObjectSer.duplicate());
}
{code}
What do you think?
was (Author: jark):
I thought about call {{duplicate}} too, but calling {{duplicate}} for every
BinaryGeneric {{materialize()}} is a performance penalty (most of the cases the
serializer is KryoSerializer).
Another idea is exposing {{ensureMaterialized(TypeSerializer<T>
javaObjectSer)}} to BinaryGeneric, it will use the serializer parameter to
materialize. And in {{BinaryGenericSerializer}}, we should call
{{generic.ensureMaterialized(ser)}} before access segments of it. This can make
sure one thread holds a serializer, and don't need to duplicate for every
records.
Some pseudo code:
{code:java}
// BinaryGeneric
public void ensureMaterialized(TypeSerializer<T> serializer) {
if (binary == null) {
binary = materialize(javaObjectSer);
}
}
public Binary materialize(TypeSerializer<T> serializer) {
byte[] bytes = InstantiationUtil.serializeToByteArray(serializer,
javaObject);
return new Binary(new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, 0,
bytes.length);
}
@Override
public Binary materialize() {
return materialize(javaObjectSer.duplicate());
}
{code}
What do you think?
> BaseMapSerializerTest.testDuplicate fails on Travis
> ---------------------------------------------------
>
> Key: FLINK-13702
> URL: https://issues.apache.org/jira/browse/FLINK-13702
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.10.0
> Reporter: Till Rohrmann
> Assignee: Dawid Wysakowicz
> Priority: Critical
> Labels: test-stability
>
> The {{BaseMapSerializerTest.testDuplicate}} fails on Travis with an
> {{java.lang.IndexOutOfBoundsException}}.
> https://api.travis-ci.org/v3/job/570973199/log.txt
--
This message was sent by Atlassian Jira
(v8.3.4#803005)