[ 
https://issues.apache.org/jira/browse/FLINK-13740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16908715#comment-16908715
 ] 

Hequn Cheng edited comment on FLINK-13740 at 8/16/19 5:14 AM:
--------------------------------------------------------------

[~till.rohrmann] Thanks a lot for pointing out the failure.

The test should be restarted after the `Artificial Failure`, as the restart 
strategy has been set with restartAttempts = 1. It is failed because there is 
another exception, as it is shown below:
{code:java}
Caused by: java.lang.IllegalStateException: Concurrent access to 
KryoSerializer. Thread 1: GroupTableAggregate -> Calc(select=[b AS category, f0 
AS v1, f1 AS v2]) (1/4) , Thread 2: AsyncOperations-thread-1
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.enterExclusiveThread(KryoSerializer.java:630)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:285)
        at 
org.apache.flink.util.InstantiationUtil.serializeToByteArray(InstantiationUtil.java:526)
        at 
org.apache.flink.table.dataformat.BinaryGeneric.materialize(BinaryGeneric.java:60)
        at 
org.apache.flink.table.dataformat.LazyBinaryFormat.ensureMaterialized(LazyBinaryFormat.java:92)
        at 
org.apache.flink.table.dataformat.BinaryGeneric.copy(BinaryGeneric.java:68)
        at 
org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:63)
        at 
org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:40)
        at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
        at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
        at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
        at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
        at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
        at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
        at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:296)
        at 
org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
        at 
org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
        at 
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
        at 
org.apache.flink.table.runtime.operators.aggregate.GroupTableAggFunction.processElement(GroupTableAggFunction.java:117)
{code}

And this exception is thrown because of the same KryoSerializer object is used 
by two threads: one is the table aggregate thread, the other is the async 
operator thread. The TypeSerializer is not thread safe, to avoid unpredictable 
side effects, it is recommended to call duplicate() method and use one 
serializer instance per thread. 

One option to fix the problem is call the duplicate() method when create the 
{{BinaryGeneric}}. Other option like making the two thread unrelated would also 
be considered however may need further discussions. 

This looks like a common problem for blink planner. Not sure whether it is a 
blocker for release-1.9? [~jark] [~lzljs3620320]

Best, Hequn

 


was (Author: hequn8128):
[~till.rohrmann] Thanks a lot for pointing out the failure.

The test should be restarted after the `Artificial Failure`, as the restart 
strategy has been set with restartAttempts = 1. It is failed because there is 
another exception, as it is shown below:
{code:java}
Caused by: java.lang.IllegalStateException: Concurrent access to 
KryoSerializer. Thread 1: GroupTableAggregate -> Calc(select=[b AS category, f0 
AS v1, f1 AS v2]) (1/4) , Thread 2: AsyncOperations-thread-1
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.enterExclusiveThread(KryoSerializer.java:630)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:285)
        at 
org.apache.flink.util.InstantiationUtil.serializeToByteArray(InstantiationUtil.java:526)
        at 
org.apache.flink.table.dataformat.BinaryGeneric.materialize(BinaryGeneric.java:60)
        at 
org.apache.flink.table.dataformat.LazyBinaryFormat.ensureMaterialized(LazyBinaryFormat.java:92)
        at 
org.apache.flink.table.dataformat.BinaryGeneric.copy(BinaryGeneric.java:68)
        at 
org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:63)
        at 
org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:40)
        at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
        at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
        at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
        at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
        at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
        at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
        at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:296)
        at 
org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
        at 
org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
        at 
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
        at 
org.apache.flink.table.runtime.operators.aggregate.GroupTableAggFunction.processElement(GroupTableAggFunction.java:117)
{code}

And this exception is thrown because of the same KryoSerializer object is used 
by two threads: one is the table aggregate thread, the other is the async 
operator thread. The TypeSerializer is not thread safe, to avoid unpredictable 
side effects, it is recommended to call duplicate() method and use one 
serializer instance per thread. 

One option to fix the problem is call the duplicate() method when create the 
{{BinaryGeneric}}. Other option like making the two thread unrelated would also 
be considered however may need further discussions. 

Not sure is it a blocker for release-1.9? [~jark] [~lzljs3620320]

Best, Hequn

 

> TableAggregateITCase.testNonkeyedFlatAggregate failed on Travis
> ---------------------------------------------------------------
>
>                 Key: FLINK-13740
>                 URL: https://issues.apache.org/jira/browse/FLINK-13740
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.10.0
>            Reporter: Till Rohrmann
>            Priority: Critical
>              Labels: test-stability
>             Fix For: 1.10.0
>
>
> The {{TableAggregateITCase.testNonkeyedFlatAggregate}} failed on Travis with 
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>       at 
> org.apache.flink.table.planner.runtime.stream.table.TableAggregateITCase.testNonkeyedFlatAggregate(TableAggregateITCase.scala:93)
> Caused by: java.lang.Exception: Artificial Failure
> {code}
> https://api.travis-ci.com/v3/job/225551182/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to