[ https://issues.apache.org/jira/browse/FLINK-13124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Konstantin Knauf updated FLINK-13124: ------------------------------------- Description: When running the {{StateMachineExample}} (with the universal Kafka connector instead of 0.10) the Job always crashes with the following exception, when stopping it with {{flink stop <job-id>}}. {noformat} taskmanager_1 | 2019-07-09 09:00:35,498 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (1/1) (8568229c7efcddf75545a503bdb737f8) switched from RUNNING to FAILED. taskmanager_1 | org.apache.flink.util.FlinkException: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:85) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:131) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711) taskmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:95) taskmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:59) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:102) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:335) taskmanager_1 | at org.apache.flink.runtime.taskmanager.Task.run(Task.java:724) taskmanager_1 | at java.lang.Thread.run(Thread.java:748) taskmanager_1 | Caused by: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:184) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:814) taskmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:124) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:108) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.finishTask(SourceStreamTask.java:114) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:729) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:604) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:124) taskmanager_1 | at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1151) taskmanager_1 | at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) taskmanager_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) taskmanager_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) taskmanager_1 | ... 1 more {noformat} Rebased on a `86bee8679112e76372a84083b1af18722644e1a0` the stacktrace looks like this: {noformat} taskmanager_1 | 2019-07-08 13:01:01,287 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (1/1) (54504e053415cee82a2a7b4293d325e9) switched from RUNNING to FAILED. taskmanager_1 | org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:814) taskmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:124) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:108) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.finishTask(SourceStreamTask.java:114) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:729) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:604) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:124) taskmanager_1 | at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1151) taskmanager_1 | at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) taskmanager_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) taskmanager_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) taskmanager_1 | at java.lang.Thread.run(Thread.java:748) {noformat} Rebased on a {{86bee8679112e76372a84083b1af18722644e1a0}} without {{ExceptionUtils.rethrowException(error, error.getMessage());}} in {{Handover#pollNext()}}: {noformat} 2019-07-09 09:00:35,498 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (1/1) (8568229c7efcddf75545a503bdb737f8) switched from RUNNING to FAILED. taskmanager_1 | org.apache.flink.util.FlinkException: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:85) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:131) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711) taskmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:95) taskmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:59) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:102) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:335) taskmanager_1 | at org.apache.flink.runtime.taskmanager.Task.run(Task.java:724) taskmanager_1 | at java.lang.Thread.run(Thread.java:748) taskmanager_1 | Caused by: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:184) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:814) taskmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:124) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:108) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.finishTask(SourceStreamTask.java:114) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:729) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:604) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:124) taskmanager_1 | at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1151) taskmanager_1 | at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) taskmanager_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) taskmanager_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) taskmanager_1 | ... 1 more {noformat} was: When running the {{StateMachineExample}} (with the universal Kafka connector instead of 0.10) the Job always crashes with the following exception, when stopping it with {{flink stop <job-id>}}. {noformat} taskmanager_1 | 2019-07-09 09:00:35,498 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (1/1) (8568229c7efcddf75545a503bdb737f8) switched from RUNNING to FAILED. taskmanager_1 | org.apache.flink.util.FlinkException: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:85) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:131) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711) taskmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:95) taskmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:59) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:102) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:335) taskmanager_1 | at org.apache.flink.runtime.taskmanager.Task.run(Task.java:724) taskmanager_1 | at java.lang.Thread.run(Thread.java:748) taskmanager_1 | Caused by: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:184) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:814) taskmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:124) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:108) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.finishTask(SourceStreamTask.java:114) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:729) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:604) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:124) taskmanager_1 | at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1151) taskmanager_1 | at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) taskmanager_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) taskmanager_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) taskmanager_1 | ... 1 more Rebased on a `86bee8679112e76372a84083b1af18722644e1a0` the stacktrace looks like this: {noformat} taskmanager_1 | 2019-07-08 13:01:01,287 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (1/1) (54504e053415cee82a2a7b4293d325e9) switched from RUNNING to FAILED. taskmanager_1 | org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:814) taskmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:124) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:108) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.finishTask(SourceStreamTask.java:114) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:729) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:604) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:124) taskmanager_1 | at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1151) taskmanager_1 | at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) taskmanager_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) taskmanager_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) taskmanager_1 | at java.lang.Thread.run(Thread.java:748) {noformat} Rebased on a {{86bee8679112e76372a84083b1af18722644e1a0}} without {{ExceptionUtils.rethrowException(error, error.getMessage());}} in {{Handover#pollNext()}}: {noformat} 2019-07-09 09:00:35,498 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (1/1) (8568229c7efcddf75545a503bdb737f8) switched from RUNNING to FAILED. taskmanager_1 | org.apache.flink.util.FlinkException: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:85) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:131) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711) taskmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:95) taskmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:59) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:102) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:335) taskmanager_1 | at org.apache.flink.runtime.taskmanager.Task.run(Task.java:724) taskmanager_1 | at java.lang.Thread.run(Thread.java:748) taskmanager_1 | Caused by: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:184) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175) taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:814) taskmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:124) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:108) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.finishTask(SourceStreamTask.java:114) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:729) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:604) taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:124) taskmanager_1 | at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1151) taskmanager_1 | at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) taskmanager_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) taskmanager_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) taskmanager_1 | ... 1 more {noformat} > Stop fails with Universal Kafka Consumer > ---------------------------------------- > > Key: FLINK-13124 > URL: https://issues.apache.org/jira/browse/FLINK-13124 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.9.0 > Reporter: Konstantin Knauf > Assignee: Kostas Kloudas > Priority: Blocker > > When running the {{StateMachineExample}} (with the universal Kafka connector > instead of 0.10) the Job always crashes with the following exception, when > stopping it with {{flink stop <job-id>}}. > {noformat} > taskmanager_1 | 2019-07-09 09:00:35,498 INFO > org.apache.flink.runtime.taskmanager.Task - Source: > Custom Source (1/1) (8568229c7efcddf75545a503bdb737f8) switched from RUNNING > to FAILED. > taskmanager_1 | org.apache.flink.util.FlinkException: > org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException > taskmanager_1 | at > org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:85) > taskmanager_1 | at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:131) > taskmanager_1 | at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711) > taskmanager_1 | at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:95) > taskmanager_1 | at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:59) > taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:102) > taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:335) > taskmanager_1 | at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:724) > taskmanager_1 | at java.lang.Thread.run(Thread.java:748) > taskmanager_1 | Caused by: > org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException > taskmanager_1 | at > org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:184) > taskmanager_1 | at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175) > taskmanager_1 | at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:814) > taskmanager_1 | at > org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:124) > taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:108) > taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.finishTask(SourceStreamTask.java:114) > taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:729) > taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:604) > taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:124) > taskmanager_1 | at > org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1151) > taskmanager_1 | at > java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) > taskmanager_1 | at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > taskmanager_1 | at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > taskmanager_1 | ... 1 more > {noformat} > Rebased on a `86bee8679112e76372a84083b1af18722644e1a0` the stacktrace looks > like this: > {noformat} > taskmanager_1 | 2019-07-08 13:01:01,287 INFO > org.apache.flink.runtime.taskmanager.Task - Source: > Custom Source (1/1) (54504e053415cee82a2a7b4293d325e9) switched from RUNNING > to FAILED. > taskmanager_1 | > org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException > taskmanager_1 | at > org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182) > taskmanager_1 | at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175) > taskmanager_1 | at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:814) > taskmanager_1 | at > org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:124) > taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:108) > taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.finishTask(SourceStreamTask.java:114) > taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:729) > taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:604) > taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:124) > taskmanager_1 | at > org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1151) > taskmanager_1 | at > java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) > taskmanager_1 | at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > taskmanager_1 | at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > taskmanager_1 | at java.lang.Thread.run(Thread.java:748) > {noformat} > Rebased on a {{86bee8679112e76372a84083b1af18722644e1a0}} without > {{ExceptionUtils.rethrowException(error, error.getMessage());}} in > {{Handover#pollNext()}}: > {noformat} > 2019-07-09 09:00:35,498 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source (1/1) > (8568229c7efcddf75545a503bdb737f8) switched from RUNNING to FAILED. > taskmanager_1 | org.apache.flink.util.FlinkException: > org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException > taskmanager_1 | at > org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:85) > taskmanager_1 | at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:131) > taskmanager_1 | at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711) > taskmanager_1 | at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:95) > taskmanager_1 | at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:59) > taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:102) > taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:335) > taskmanager_1 | at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:724) > taskmanager_1 | at java.lang.Thread.run(Thread.java:748) > taskmanager_1 | Caused by: > org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException > taskmanager_1 | at > org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:184) > taskmanager_1 | at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175) > taskmanager_1 | at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:814) > taskmanager_1 | at > org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:124) > taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:108) > taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.finishTask(SourceStreamTask.java:114) > taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:729) > taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:604) > taskmanager_1 | at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:124) > taskmanager_1 | at > org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1151) > taskmanager_1 | at > java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) > taskmanager_1 | at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > taskmanager_1 | at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > taskmanager_1 | ... 1 more > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)