[ 
https://issues.apache.org/jira/browse/BEAM-2807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Harper updated BEAM-2807:
--------------------------------
    Description: 
*Beam version:* 2.1.0
*Runner:* FlinkRunner


We're seeing the following exception when checkpointing, which is causing our 
job to restart

{code}
2017-08-25 09:42:17,658 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> 
ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem (7/32) 
(f00a31b722a31030f18d83ac613de21d) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
for operator Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> 
ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem 
(7/32).}
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:966)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator 
Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> 
ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem (7/32).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: 
java.lang.NullPointerException
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:893)
    ... 5 more
    Suppressed: java.lang.Exception: Could not properly cancel managed keyed 
state future.
        at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1018)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:957)
        ... 5 more
    Caused by: java.util.concurrent.ExecutionException: 
java.lang.NullPointerException
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at 
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
        at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
        ... 7 more
    Caused by: java.lang.NullPointerException
        at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347)
        at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
        at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$CoderTypeSerializerConfigSnapshot.write(CoderTypeSerializer.java:189)
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.write(TypeSerializerSerializationUtil.java:413)
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:229)
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:151)
        at 
org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoWriterV3.writeStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:107)
        at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.write(KeyedBackendSerializationProxy.java:104)
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:293)
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:286)
        at 
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:329)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
        at 
org.apache.flink.streaming.runtime.io.BarrierTracker.notifyCheckpoint(BarrierTracker.java:263)
        at 
org.apache.flink.streaming.runtime.io.BarrierTracker.processBarrier(BarrierTracker.java:178)
        at 
org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:97)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        ... 1 more
{code}

>From debugging locally I've narrowed it down to here 

{code}
    Caused by: java.lang.NullPointerException
        at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347)
        at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
        at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$CoderTypeSerializerConfigSnapshot.write(CoderTypeSerializer.java:189)
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.write(TypeSerializerSerializationUtil.java:413)
{code}

Specifically in 
[CoderTypeSerializer.java#189|https://github.com/apache/beam/blob/609016d700c84800cf942482fb7cd2ddaa420b00/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L189],
 when it calls {{DataOutputStream.writeUTF(String)}}, there is some logic in 
the {{write}} method that gets the string length. This is what is causing the 
NPE as the {{coderName}} field is null.

I think this stems from the 
[constructor|https://github.com/apache/beam/blob/609016d700c84800cf942482fb7cd2ddaa420b00/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L164]
 which sets the {{coderName}} by calling {{.getClass().getCanonicalName();}} on 
the {{coder}} that is passed into the constructor

On debugging I've noticed this returns {{null}} when calling 
{{.getClass().getCanonicalName();}} on an instance of 
[Count$CountFn|https://github.com/apache/beam/blob/2040e2bd4203f81ea63966110e4eef3a1ff72393/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java#L134]







  was:
*Beam version:* 2.1.0
*Runner:* FlinkRunner


We're seeing the following exception when checkpointing, which is causing our 
job to restart

{code}
2017-08-25 09:42:17,658 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> 
ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem (7/32) 
(f00a31b722a31030f18d83ac613de21d) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
for operator Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> 
ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem 
(7/32).}
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:966)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator 
Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> 
ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem (7/32).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: 
java.lang.NullPointerException
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:893)
    ... 5 more
    Suppressed: java.lang.Exception: Could not properly cancel managed keyed 
state future.
        at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1018)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:957)
        ... 5 more
    Caused by: java.util.concurrent.ExecutionException: 
java.lang.NullPointerException
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at 
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
        at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
        ... 7 more
    Caused by: java.lang.NullPointerException
        at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347)
        at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
        at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$CoderTypeSerializerConfigSnapshot.write(CoderTypeSerializer.java:189)
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.write(TypeSerializerSerializationUtil.java:413)
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:229)
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:151)
        at 
org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoWriterV3.writeStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:107)
        at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.write(KeyedBackendSerializationProxy.java:104)
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:293)
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:286)
        at 
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:329)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
        at 
org.apache.flink.streaming.runtime.io.BarrierTracker.notifyCheckpoint(BarrierTracker.java:263)
        at 
org.apache.flink.streaming.runtime.io.BarrierTracker.processBarrier(BarrierTracker.java:178)
        at 
org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:97)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        ... 1 more
{code}

>From debugging locally I've narrowed it down to here 

{code}
    Caused by: java.lang.NullPointerException
        at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347)
        at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
        at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$CoderTypeSerializerConfigSnapshot.write(CoderTypeSerializer.java:189)
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.write(TypeSerializerSerializationUtil.java:413)
{code}

Specifically in 
[CoderTypeSerializer.java#189|https://github.com/apache/beam/blob/609016d700c84800cf942482fb7cd2ddaa420b00/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L189],
 when it calls {{DataOutputStream.write(String)}}, there is some logic in the 
{{write}} method that gets the string length. This is what is causing the NPE 
as the {{coderName}} field is null.

I think this stems from the 
[constructor|https://github.com/apache/beam/blob/609016d700c84800cf942482fb7cd2ddaa420b00/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L164]
 which sets the {{coderName}} by calling {{.getClass().getCanonicalName();}} on 
the {{coder}} that is passed into the constructor

On debugging I've noticed this returns {{null}} when calling 
{{.getClass().getCanonicalName();}} on an instance of 
[Count$CountFn|https://github.com/apache/beam/blob/2040e2bd4203f81ea63966110e4eef3a1ff72393/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java#L134]








> NullPointerException during checkpoint on FlinkRunner
> -----------------------------------------------------
>
>                 Key: BEAM-2807
>                 URL: https://issues.apache.org/jira/browse/BEAM-2807
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-model
>    Affects Versions: 2.1.0
>            Reporter: Daniel Harper
>            Assignee: Kenneth Knowles
>            Priority: Blocker
>
> *Beam version:* 2.1.0
> *Runner:* FlinkRunner
> We're seeing the following exception when checkpointing, which is causing our 
> job to restart
> {code}
> 2017-08-25 09:42:17,658 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) 
> -> ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem 
> (7/32) (f00a31b722a31030f18d83ac613de21d) switched from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> 
> ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem 
> (7/32).}
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:966)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> 
> ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem 
> (7/32).
>     ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.NullPointerException
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>     at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:893)
>     ... 5 more
>     Suppressed: java.lang.Exception: Could not properly cancel managed keyed 
> state future.
>         at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1018)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:957)
>         ... 5 more
>     Caused by: java.util.concurrent.ExecutionException: 
> java.lang.NullPointerException
>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>         at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>         at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>         at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>         ... 7 more
>     Caused by: java.lang.NullPointerException
>         at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347)
>         at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
>         at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$CoderTypeSerializerConfigSnapshot.write(CoderTypeSerializer.java:189)
>         at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.write(TypeSerializerSerializationUtil.java:413)
>         at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:229)
>         at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:151)
>         at 
> org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoWriterV3.writeStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:107)
>         at 
> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.write(KeyedBackendSerializationProxy.java:104)
>         at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:293)
>         at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:286)
>         at 
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:329)
>         at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
>         at 
> org.apache.flink.streaming.runtime.io.BarrierTracker.notifyCheckpoint(BarrierTracker.java:263)
>         at 
> org.apache.flink.streaming.runtime.io.BarrierTracker.processBarrier(BarrierTracker.java:178)
>         at 
> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:97)
>         at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
>         at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         ... 1 more
> {code}
> From debugging locally I've narrowed it down to here 
> {code}
>     Caused by: java.lang.NullPointerException
>         at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347)
>         at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
>         at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$CoderTypeSerializerConfigSnapshot.write(CoderTypeSerializer.java:189)
>         at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.write(TypeSerializerSerializationUtil.java:413)
> {code}
> Specifically in 
> [CoderTypeSerializer.java#189|https://github.com/apache/beam/blob/609016d700c84800cf942482fb7cd2ddaa420b00/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L189],
>  when it calls {{DataOutputStream.writeUTF(String)}}, there is some logic in 
> the {{write}} method that gets the string length. This is what is causing the 
> NPE as the {{coderName}} field is null.
> I think this stems from the 
> [constructor|https://github.com/apache/beam/blob/609016d700c84800cf942482fb7cd2ddaa420b00/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L164]
>  which sets the {{coderName}} by calling {{.getClass().getCanonicalName();}} 
> on the {{coder}} that is passed into the constructor
> On debugging I've noticed this returns {{null}} when calling 
> {{.getClass().getCanonicalName();}} on an instance of 
> [Count$CountFn|https://github.com/apache/beam/blob/2040e2bd4203f81ea63966110e4eef3a1ff72393/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java#L134]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to