[ 
https://issues.apache.org/jira/browse/BEAM-11191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tobias Kaymak updated BEAM-11191:
---------------------------------
    Description: 
When running our Kafka-To-BigQuery pipeline with the Flink 1.11.2 or 1.10.2 
Docker image,
 the following exception is visible for the failing job on the *job manager*:

{{{{2020-11-04 09:27:14}}{{java.lang.RuntimeException: Failed to cleanup global 
state.}}{{    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150)}}{{
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791)}}{{
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:741)}}{{
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:713)}}{{
    at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:167)}}{{
    at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:179)}}{{
    at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101)}}{{
    at 
[org.apache.flink.streaming.runtime.io|http://org.apache.flink.streaming.runtime.io/].StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:180)}}{{
    at 
[org.apache.flink.streaming.runtime.io|http://org.apache.flink.streaming.runtime.io/].StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)}}{{
    at 
[org.apache.flink.streaming.runtime.io|http://org.apache.flink.streaming.runtime.io/].StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)}}{{
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)}}{{
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)}}{{
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)}}{{
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)}}{{
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)}}{{
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)}}{{    at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)}}{{    at 
java.lang.Thread.run(Thread.java:748)}}{{Caused by: 
java.lang.ClassCastException: java.lang.String cannot be cast to 
org.apache.flink.runtime.state.VoidNamespace}}{{    at 
org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)}}{{
    at 
org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils.writeNameSpace(RocksDBKeySerializationUtils.java:77)}}{{
    at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getKeys(RocksDBKeyedStateBackend.java:291)}}{{
    at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.applyToAllKeys(AbstractKeyedStateBackend.java:242)}}{{
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:141)}}{{
    ... 17 more}}}}
{{  }}
{{ This is from the *task manager's* logs:}}
{{  }}
{{2020-11-04 08:46:31,250 WARN org.apache.flink.runtime.taskmanager.Task [] - 
BigQueryIO.Write/BatchLoads/JobIdCreationRoot_LOAD/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Splittable}}
{{ ueryIO.Write/BatchLoads/CreateJobId_LOAD/ParMultiDo(Anonymous) -> 
BigQueryIO.Write/BatchLoads/JobIdSideInput_LOAD/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
 -> ToKeyedWorkItem (1/1) (bebac6c581d1b8ece88007ec0}}
{{ java.lang.RuntimeException: Failed to cleanup global state.}}
{{ at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150)
 
~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]}}
{{ at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791)
 
~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]}}
{{ at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:741)
 
~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]}}
{{ at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:713)
 
~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]}}
{{ at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:167)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
{{ at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:179)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
{{ at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
{{ at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:180)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
{{ at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
{{ at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
{{ at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
{{ at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
{{ at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
{{ at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
{{ at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
{{ at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
[flink-dist_2.11-1.11.2.jar:1.11.2]}}
{{ at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
[flink-dist_2.11-1.11.2.jar:1.11.2]}}
{{ at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]}}
{{ Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to 
org.apache.flink.runtime.state.VoidNamespace}}
{{ at 
org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
{{ at 
org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils.writeNameSpace(RocksDBKeySerializationUtils.java:77)
 ~[flink-statebackend-rocksdb_2.11-1.11.2.jar:1.11.2]}}
{{ at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getKeys(RocksDBKeyedStateBackend.java:291)
 ~[flink-statebackend-rocksdb_2.11-1.11.2.jar:1.11.2]}}
{{ at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.applyToAllKeys(AbstractKeyedStateBackend.java:242)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
{{ at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:141)
 
~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]}}
{{ ... 17 more}}

  was:
When running our Kafka-To-BigQuery pipeline with the Flink 1.11.2 or 1.10.2 
Docker image,
the following exception is visible for the failing job on the *job manager*:


{{2020-11-04 09:27:14}}{{java.lang.RuntimeException: Failed to cleanup global 
state.}}{{    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150)}}{{
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791)}}{{
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:741)}}{{
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:713)}}{{
    at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:167)}}{{
    at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:179)}}{{
    at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101)}}{{
    at 
[org.apache.flink.streaming.runtime.io|http://org.apache.flink.streaming.runtime.io/].StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:180)}}{{
    at 
[org.apache.flink.streaming.runtime.io|http://org.apache.flink.streaming.runtime.io/].StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)}}{{
    at 
[org.apache.flink.streaming.runtime.io|http://org.apache.flink.streaming.runtime.io/].StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)}}{{
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)}}{{
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)}}{{
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)}}{{
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)}}{{
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)}}{{
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)}}{{    at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)}}{{    at 
java.lang.Thread.run(Thread.java:748)}}{{Caused by: 
java.lang.ClassCastException: java.lang.String cannot be cast to 
org.apache.flink.runtime.state.VoidNamespace}}{{    at 
org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)}}{{
    at 
org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils.writeNameSpace(RocksDBKeySerializationUtils.java:77)}}{{
    at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getKeys(RocksDBKeyedStateBackend.java:291)}}{{
    at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.applyToAllKeys(AbstractKeyedStateBackend.java:242)}}{{
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:141)}}{{
    ... 17 more}}
 
This is from the *task manager's* logs:
 
 
{{2020-11-04 08:46:31,250 WARN org.apache.flink.runtime.taskmanager.Task [] - 
BigQueryIO.Write/BatchLoads/JobIdCreationRoot_LOAD/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Splittable
ueryIO.Write/BatchLoads/CreateJobId_LOAD/ParMultiDo(Anonymous) -> 
BigQueryIO.Write/BatchLoads/JobIdSideInput_LOAD/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
 -> ToKeyedWorkItem (1/1) (bebac6c581d1b8ece88007ec0
java.lang.RuntimeException: Failed to cleanup global state.
 at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150)
 
~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]
 at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791)
 
~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]
 at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:741)
 
~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]
 at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:713)
 
~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:167)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:179)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:180)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to 
org.apache.flink.runtime.state.VoidNamespace
 at 
org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils.writeNameSpace(RocksDBKeySerializationUtils.java:77)
 ~[flink-statebackend-rocksdb_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getKeys(RocksDBKeyedStateBackend.java:291)
 ~[flink-statebackend-rocksdb_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.applyToAllKeys(AbstractKeyedStateBackend.java:242)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:141)
 
~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]
 ... 17 more}}

       Priority: P0  (was: P2)

> clearGlobalState() method failing on Flink runner 1.10 and 1.11
> ---------------------------------------------------------------
>
>                 Key: BEAM-11191
>                 URL: https://issues.apache.org/jira/browse/BEAM-11191
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.25.0
>            Reporter: Tobias Kaymak
>            Priority: P0
>
> When running our Kafka-To-BigQuery pipeline with the Flink 1.11.2 or 1.10.2 
> Docker image,
>  the following exception is visible for the failing job on the *job manager*:
> {{{{2020-11-04 09:27:14}}{{java.lang.RuntimeException: Failed to cleanup 
> global state.}}{{    at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150)}}{{
>     at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791)}}{{
>     at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:741)}}{{
>     at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:713)}}{{
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:167)}}{{
>     at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:179)}}{{
>     at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101)}}{{
>     at 
> [org.apache.flink.streaming.runtime.io|http://org.apache.flink.streaming.runtime.io/].StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:180)}}{{
>     at 
> [org.apache.flink.streaming.runtime.io|http://org.apache.flink.streaming.runtime.io/].StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)}}{{
>     at 
> [org.apache.flink.streaming.runtime.io|http://org.apache.flink.streaming.runtime.io/].StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)}}{{
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)}}{{
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)}}{{
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)}}{{
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)}}{{
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)}}{{
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)}}{{    
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)}}{{    at 
> java.lang.Thread.run(Thread.java:748)}}{{Caused by: 
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> org.apache.flink.runtime.state.VoidNamespace}}{{    at 
> org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)}}{{
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils.writeNameSpace(RocksDBKeySerializationUtils.java:77)}}{{
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getKeys(RocksDBKeyedStateBackend.java:291)}}{{
>     at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.applyToAllKeys(AbstractKeyedStateBackend.java:242)}}{{
>     at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:141)}}{{
>     ... 17 more}}}}
> {{  }}
> {{ This is from the *task manager's* logs:}}
> {{  }}
> {{2020-11-04 08:46:31,250 WARN org.apache.flink.runtime.taskmanager.Task [] - 
> BigQueryIO.Write/BatchLoads/JobIdCreationRoot_LOAD/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Splittable}}
> {{ ueryIO.Write/BatchLoads/CreateJobId_LOAD/ParMultiDo(Anonymous) -> 
> BigQueryIO.Write/BatchLoads/JobIdSideInput_LOAD/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
>  -> ToKeyedWorkItem (1/1) (bebac6c581d1b8ece88007ec0}}
> {{ java.lang.RuntimeException: Failed to cleanup global state.}}
> {{ at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150)
>  
> ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]}}
> {{ at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791)
>  
> ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]}}
> {{ at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:741)
>  
> ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]}}
> {{ at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:713)
>  
> ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]}}
> {{ at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:167)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
> {{ at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:179)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
> {{ at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
> {{ at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:180)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
> {{ at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
> {{ at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
> {{ at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
> {{ at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
> {{ at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
> {{ at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
> {{ at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
> {{ at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
> [flink-dist_2.11-1.11.2.jar:1.11.2]}}
> {{ at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
> [flink-dist_2.11-1.11.2.jar:1.11.2]}}
> {{ at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]}}
> {{ Caused by: java.lang.ClassCastException: java.lang.String cannot be cast 
> to org.apache.flink.runtime.state.VoidNamespace}}
> {{ at 
> org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
> {{ at 
> org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils.writeNameSpace(RocksDBKeySerializationUtils.java:77)
>  ~[flink-statebackend-rocksdb_2.11-1.11.2.jar:1.11.2]}}
> {{ at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getKeys(RocksDBKeyedStateBackend.java:291)
>  ~[flink-statebackend-rocksdb_2.11-1.11.2.jar:1.11.2]}}
> {{ at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.applyToAllKeys(AbstractKeyedStateBackend.java:242)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]}}
> {{ at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:141)
>  
> ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]}}
> {{ ... 17 more}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to