[ 
https://issues.apache.org/jira/browse/FLINK-21963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17310675#comment-17310675
 ] 

Robert Metzger commented on FLINK-21963:
----------------------------------------

The test is difficult to reproduce locally (stopped after 229 runs).

There are two problems here:

1. The job is scaling up initially, despite being a test for scale down. In 
case of the test failure, the tasks get deployed on one TM, then cancelled for 
scale up and deployed again on two TMs. The counting for the number of running 
instances is then wrong in the CountDownLatch

On a side note:
It seems that the Execution Graph gets out of sync, but eventually reaches 
consistency again.  This is showing all log statements involving 
ca080a6391c7887c02a00ee3203b901a.
ca080a6391c7887c02a00ee3203b901a is in state CANCELLED on the JobManager, when 
the TaskManager receives it.
My assumption is/was that we should only mark something as "CANCELLED" when the 
cancellation has succeeded. 

{code}
14:52:16,090 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
Custom Source -> Sink: Unnamed (1/2) (ca080a6391c7887c02a00ee3203b901a) 
switched from CREATED to DEPLOYING.
14:52:16,090 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying 
Source: Custom Source -> Sink: Unnamed (1/2) (attempt #0) with attempt id 
ca080a6391c7887c02a00ee3203b901a to 5420c4ef-c20e-49fa-9dd9-5df302da28c2 @ 
localhost (dataPort=-1) with allocation id ed7c4f301d07c27b59aa38d37c9f6057
14:52:16,092 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
Custom Source -> Sink: Unnamed (1/2) (ca080a6391c7887c02a00ee3203b901a) 
switched from DEPLOYING to CANCELING.
14:52:16,094 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
Custom Source -> Sink: Unnamed (1/2) (ca080a6391c7887c02a00ee3203b901a) 
switched from CANCELING to CANCELED.
14:52:16,094 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding 
the results produced by task execution ca080a6391c7887c02a00ee3203b901a.
14:52:16,120 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task 
Source: Custom Source -> Sink: Unnamed (1/2)#0 
(ca080a6391c7887c02a00ee3203b901a), deploy into slot with allocation id 
ed7c4f301d07c27b59aa38d37c9f6057.
14:52:16,130 [Source: Custom Source -> Sink: Unnamed (1/2)#0] INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Source: 
Custom Source -> Sink: Unnamed (1/2)#0 (ca080a6391c7887c02a00ee3203b901a) 
switched from CREATED to DEPLOYING.
14:52:16,168 [Source: Custom Source -> Sink: Unnamed (1/2)#0] INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Loading JAR 
files for task Source: Custom Source -> Sink: Unnamed (1/2)#0 
(ca080a6391c7887c02a00ee3203b901a) [DEPLOYING].
14:52:16,182 [Source: Custom Source -> Sink: Unnamed (1/2)#0] INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Registering 
task at network: Source: Custom Source -> Sink: Unnamed (1/2)#0 
(ca080a6391c7887c02a00ee3203b901a) [DEPLOYING].
14:52:16,182 [Source: Custom Source -> Sink: Unnamed (1/2)#0] INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Source: 
Custom Source -> Sink: Unnamed (1/2)#0 (ca080a6391c7887c02a00ee3203b901a) 
switched from DEPLOYING to RUNNING.
14:52:16,183 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Attempting to 
fail task externally Source: Custom Source -> Sink: Unnamed (1/2)#0 
(ca080a6391c7887c02a00ee3203b901a).
14:52:16,184 [flink-akka.actor.default-dispatcher-2] WARN  
org.apache.flink.runtime.taskmanager.Task                    [] - Source: 
Custom Source -> Sink: Unnamed (1/2)#0 (ca080a6391c7887c02a00ee3203b901a) 
switched from RUNNING to FAILED with failure cause: 
org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution 
attempt ca080a6391c7887c02a00ee3203b901a was not found.
14:52:16,185 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Triggering 
cancellation of task code Source: Custom Source -> Sink: Unnamed (1/2)#0 
(ca080a6391c7887c02a00ee3203b901a).
14:52:16,241 [Source: Custom Source -> Sink: Unnamed (1/2)#0] INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task 
resources for Source: Custom Source -> Sink: Unnamed (1/2)#0 
(ca080a6391c7887c02a00ee3203b901a).
14:52:16,242 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - 
Un-registering task and sending final execution state FAILED to JobManager for 
task Source: Custom Source -> Sink: Unnamed (1/2)#0 
ca080a6391c7887c02a00ee3203b901a.
14:52:16,265 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Cannot find 
task to fail for execution ca080a6391c7887c02a00ee3203b901a with exception:
org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution 
attempt ca080a6391c7887c02a00ee3203b901a was not found.
{code}
I assume this is the expected behavior.

> ReactiveModelITCase.testScaleDownOnTaskManagerLoss failed
> ---------------------------------------------------------
>
>                 Key: FLINK-21963
>                 URL: https://issues.apache.org/jira/browse/FLINK-21963
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination, Tests
>    Affects Versions: 1.13.0
>            Reporter: Matthias
>            Assignee: Robert Metzger
>            Priority: Critical
>              Labels: pull-request-available, test-stability
>             Fix For: 1.13.0
>
>
> [This 
> build|https://dev.azure.com/mapohl/flink/_build/results?buildId=360&view=logs&j=e0582806-6d85-5dc5-7eb4-4289d3d0de6b&t=9fea6cf4-6ce3-5c26-d059-69f4d4cec7d1&l=4442]
>  failed (not exclusively) due to 
> {{ReactiveModelITCase.testScaleDownOnTaskManagerLoss}}.
> I was able to reproduce it locally having the {{DefaultScheduler}} enabled. 
> The test seems to get into an infinite loop:
> {code}
> [...]
> 76125 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task 
> and sending final execution state FAILED to JobManager for task Source: 
> Custom Source -> Sink: Unnamed (4/4)#8738 92b920a905c55fc85a76c79b3acef161.
> 76125 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
> Returning logical slot to shared slot 
> (SlotRequestId{0896a914cffb9d6631dc061ff4f485b4})
> 76125 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
> shared slot externally (SlotRequestId{0896a914cffb9d6631dc061ff4f485b4})
> 76125 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
> Free reserved slot aec00279d7404b26a104ee906695d27a.
> 76125 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
> shared slot (SlotRequestId{0896a914cffb9d6631dc061ff4f485b4})
> 76125 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Transition 
> from state Executing to Restarting.
> 76125 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink 
> Streaming Job (4b5f437c7c47c8be9f8d8bf08e78910a) switched from state RUNNING 
> to CANCELLING.
> 76125 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (2/4) (843d0c154f55a15a9bb1e705ae282032) switched 
> from RUNNING to CANCELING.
> 76125 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (3/4) (ba0dd94db26abc376ee73522410b8094) switched 
> from RUNNING to CANCELING.
> 76125 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (4/4) (92b920a905c55fc85a76c79b3acef161) switched 
> from RUNNING to CANCELING.
> 76126 [flink-akka.actor.default-dispatcher-4] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution 843d0c154f55a15a9bb1e705ae282032.
> 76126 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution ba0dd94db26abc376ee73522410b8094.
> 76126 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution 92b920a905c55fc85a76c79b3acef161.
> 76126 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (3/4) (ba0dd94db26abc376ee73522410b8094) switched 
> from CANCELING to CANCELED.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Ignoring 
> transition of vertex Source: Custom Source -> Sink: Unnamed (3/4) - execution 
> #8738 to FAILED while being CANCELED.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
> Returning logical slot to shared slot 
> (SlotRequestId{786f89cafa4833afb26d0eb5da265a38})
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
> shared slot externally (SlotRequestId{786f89cafa4833afb26d0eb5da265a38})
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
> Free reserved slot dde6780a1f8df3d0b1b1b454e28f8566.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
> shared slot (SlotRequestId{786f89cafa4833afb26d0eb5da265a38})
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Cannot run 
> 'newResourcesAvailable' because the actual state is Restarting and not 
> ResourceConsumer.
> 76126 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Discarding the 
> results produced by task execution ba0dd94db26abc376ee73522410b8094.
> 76126 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (2/4) (843d0c154f55a15a9bb1e705ae282032) switched 
> from CANCELING to CANCELED.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Ignoring 
> transition of vertex Source: Custom Source -> Sink: Unnamed (2/4) - execution 
> #8739 to FAILED while being CANCELED.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
> Returning logical slot to shared slot 
> (SlotRequestId{86ab3cc76a17ec876b12bbaa6efcfa8c})
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
> shared slot externally (SlotRequestId{86ab3cc76a17ec876b12bbaa6efcfa8c})
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
> Free reserved slot 25dd5bd3007772fe2cc69568cad2d882.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
> shared slot (SlotRequestId{86ab3cc76a17ec876b12bbaa6efcfa8c})
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Cannot run 
> 'newResourcesAvailable' because the actual state is Restarting and not 
> ResourceConsumer.
> 76126 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Discarding the 
> results produced by task execution 843d0c154f55a15a9bb1e705ae282032.
> 76126 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (4/4) (92b920a905c55fc85a76c79b3acef161) switched 
> from CANCELING to CANCELED.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Ignoring 
> transition of vertex Source: Custom Source -> Sink: Unnamed (4/4) - execution 
> #8738 to FAILED while being CANCELED.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
> Returning logical slot to shared slot 
> (SlotRequestId{1fde7e6e5c69ce0ac831b5bc7de6a90d})
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
> shared slot externally (SlotRequestId{1fde7e6e5c69ce0ac831b5bc7de6a90d})
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
> Free reserved slot 3315697ecf20a1249d7dad268892bcc9.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Release 
> shared slot (SlotRequestId{1fde7e6e5c69ce0ac831b5bc7de6a90d})
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Cannot run 
> 'newResourcesAvailable' because the actual state is Restarting and not 
> ResourceConsumer.
> 76126 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink 
> Streaming Job (4b5f437c7c47c8be9f8d8bf08e78910a) switched from state 
> CANCELLING to CANCELED.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - ExecutionGraph 
> 4b5f437c7c47c8be9f8d8bf08e78910a reached terminal state CANCELED.
> 76126 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping 
> checkpoint coordinator for job 4b5f437c7c47c8be9f8d8bf08e78910a.
> 76126 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Discarding the 
> results produced by task execution 92b920a905c55fc85a76c79b3acef161.
> 76126 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution ba0dd94db26abc376ee73522410b8094.
> 76126 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution 92b920a905c55fc85a76c79b3acef161.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Ignoring 
> scheduled action because expected state 
> org.apache.flink.runtime.scheduler.adaptive.Executing@480dd446 is not the 
> actual state org.apache.flink.runtime.scheduler.adaptive.Restarting@64a59f58.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
> Declare new resource requirements for job 4b5f437c7c47c8be9f8d8bf08e78910a.
>       required resources: 
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
> numberOfRequiredSlots=32768}]
>       acquired resources: 
> ResourceCounter{resources={ResourceProfile{UNKNOWN}=4}}
> 76126 [flink-akka.actor.default-dispatcher-4] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution 843d0c154f55a15a9bb1e705ae282032.
> 76126 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Transition 
> from state Restarting to WaitingForResources.
> 76127 [flink-akka.actor.default-dispatcher-4] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution ba0dd94db26abc376ee73522410b8094.
> 76127 [flink-akka.actor.default-dispatcher-4] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution 92b920a905c55fc85a76c79b3acef161.
> 76127 [SIGINT handler] WARN  org.apache.flink.util.TestSignalHandler [] - 
> RECEIVED SIGNAL 2: SIGINT. Shutting down as requested.
> 76127 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Transition 
> from state WaitingForResources to CreatingExecutionGraph.
> 76127 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution 843d0c154f55a15a9bb1e705ae282032.
> 76127 [jobmanager-future-thread-9] INFO  
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Running 
> initialization on master for job Flink Streaming Job 
> (4b5f437c7c47c8be9f8d8bf08e78910a).
> 76127 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution ba0dd94db26abc376ee73522410b8094.
> 76127 [jobmanager-future-thread-9] INFO  
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - 
> Successfully ran initialization on master in 0 ms.
> 76127 [jobmanager-future-thread-9] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Adding 1 
> vertices from job graph Flink Streaming Job 
> (4b5f437c7c47c8be9f8d8bf08e78910a).
> 76127 [jobmanager-future-thread-9] DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Attaching 1 
> topologically sorted vertices to existing job graph with 0 vertices and 0 
> intermediate results.
> 76127 [jobmanager-future-thread-9] DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Connecting 
> ExecutionJobVertex cbc357ccb763df2852fee8c4fc7d55f2 (Source: Custom Source -> 
> Sink: Unnamed) to 0 predecessors.
> 76127 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution 92b920a905c55fc85a76c79b3acef161.
> 76127 [flink-akka.actor.default-dispatcher-4] DEBUG 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Cannot find task to 
> stop for execution 843d0c154f55a15a9bb1e705ae282032.
> 76127 [jobmanager-future-thread-9] INFO  
> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - 
> Built 4 pipelined regions in 0 ms
> 76127 [jobmanager-future-thread-9] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - 
> Successfully created execution graph from job graph Flink Streaming Job 
> (4b5f437c7c47c8be9f8d8bf08e78910a).
> 76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported.[localhost, 
> jobmanager, Flink Streaming Job]
> 76127 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Ignoring 
> transition of vertex Source: Custom Source -> Sink: Unnamed (3/4) - execution 
> #8738 to FAILED while being CANCELED.
> 76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'numberOfInProgressCheckpoints'. Metric will not be reported.[localhost, 
> jobmanager, Flink Streaming Job]
> 76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'numberOfCompletedCheckpoints'. Metric will not be reported.[localhost, 
> jobmanager, Flink Streaming Job]
> 76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'numberOfFailedCheckpoints'. Metric will not be reported.[localhost, 
> jobmanager, Flink Streaming Job]
> 76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'lastCheckpointRestoreTimestamp'. Metric will not be reported.[localhost, 
> jobmanager, Flink Streaming Job]
> 76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'lastCheckpointSize'. Metric will not be reported.[localhost, jobmanager, 
> Flink Streaming Job]
> 76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'lastCheckpointDuration'. Metric will not be reported.[localhost, jobmanager, 
> Flink Streaming Job]
> 76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'lastCheckpointProcessedData'. Metric will not be reported.[localhost, 
> jobmanager, Flink Streaming Job]
> 76127 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Ignoring 
> transition of vertex Source: Custom Source -> Sink: Unnamed (4/4) - execution 
> #8738 to FAILED while being CANCELED.
> 76127 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'lastCheckpointPersistedData'. Metric will not be reported.[localhost, 
> jobmanager, Flink Streaming Job]
> 76128 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'lastCheckpointExternalPath'. Metric will not be reported.[localhost, 
> jobmanager, Flink Streaming Job]
> 76128 [jobmanager-future-thread-9] INFO  
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - No state 
> backend has been configured, using default (HashMap) 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@90aee29
> 76128 [jobmanager-future-thread-9] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - The 
> configuration state.checkpoint-storage has not be set in the current sessions 
> flink-conf.yaml. Falling back to a default CheckpointStorage type. Users are 
> strongly encouraged explicitly set this configuration so they understand how 
> their applications are checkpointing snapshots for fault-tolerance.
> 76128 [jobmanager-future-thread-9] INFO  
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Checkpoint 
> storage is set to JobManager
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Ignoring 
> transition of vertex Source: Custom Source -> Sink: Unnamed (2/4) - execution 
> #8739 to FAILED while being CANCELED.
> 76128 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'restartingTime'. Metric will not be reported.[localhost, jobmanager, Flink 
> Streaming Job]
> 76128 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 
> 'downtime'. Metric will not be reported.[localhost, jobmanager, Flink 
> Streaming Job]
> 76128 [jobmanager-future-thread-9] WARN  org.apache.flink.metrics.MetricGroup 
> [] - Name collision: Group already contains a Metric with the name 'uptime'. 
> Metric will not be reported.[localhost, jobmanager, Flink Streaming Job]
> 76128 [jobmanager-future-thread-9] DEBUG 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Status of the 
> shared state registry of job 4b5f437c7c47c8be9f8d8bf08e78910a after restore: 
> SharedStateRegistry{registeredStates={}}.
> 76128 [jobmanager-future-thread-9] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint 
> found during restore.
> 76128 [jobmanager-future-thread-9] DEBUG 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Resetting the 
> master hooks.
> 76128 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink 
> Streaming Job (4b5f437c7c47c8be9f8d8bf08e78910a) switched from state CREATED 
> to RUNNING.
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool [] - 
> Reserve free slot with allocation id aec00279d7404b26a104ee906695d27a.
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
> Allocating logical slot from shared slot 
> (SlotRequestId{88b040e446ee408f792334a2ec437a42})
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool [] - 
> Reserve free slot with allocation id 25dd5bd3007772fe2cc69568cad2d882.
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
> Allocating logical slot from shared slot 
> (SlotRequestId{96939e4bb685186000b4001d96082081})
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool [] - 
> Reserve free slot with allocation id dde6780a1f8df3d0b1b1b454e28f8566.
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
> Allocating logical slot from shared slot 
> (SlotRequestId{c879c9aa7a7cb6dfbc12502ce7a8ed12})
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool [] - 
> Reserve free slot with allocation id 3315697ecf20a1249d7dad268892bcc9.
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - 
> Allocating logical slot from shared slot 
> (SlotRequestId{53a25924bcd9fe2db23c22e0bf17effe})
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - 
> Successfully reserved and assigned the required slots for the ExecutionGraph.
> 76128 [flink-akka.actor.default-dispatcher-3] DEBUG 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Transition 
> from state CreatingExecutionGraph to Executing.
> 76128 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (1/4) (9132ba5f6b087654fb351138ce74e710) switched 
> from CREATED to DEPLOYING.
> 76128 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
> Custom Source -> Sink: Unnamed (1/4) (attempt #8740) with attempt id 
> 9132ba5f6b087654fb351138ce74e710 to 3ebdf185-dcde-4ad2-b567-2f14c1b86fd1 @ 
> localhost (dataPort=-1) with allocation id aec00279d7404b26a104ee906695d27a
> 76128 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (2/4) (7384c32213a4e9cd3aa6ee5875b1e532) switched 
> from CREATED to DEPLOYING.
> 76128 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
> Custom Source -> Sink: Unnamed (2/4) (attempt #8740) with attempt id 
> 7384c32213a4e9cd3aa6ee5875b1e532 to 3ebdf185-dcde-4ad2-b567-2f14c1b86fd1 @ 
> localhost (dataPort=-1) with allocation id 25dd5bd3007772fe2cc69568cad2d882
> 76128 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (3/4) (497dd733deeb255e05bae82f0e41527d) switched 
> from CREATED to DEPLOYING.
> 76128 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
> Custom Source -> Sink: Unnamed (3/4) (attempt #8739) with attempt id 
> 497dd733deeb255e05bae82f0e41527d to 6cf04c09-5378-4bf3-aedd-e9a52076ec99 @ 
> localhost (dataPort=-1) with allocation id dde6780a1f8df3d0b1b1b454e28f8566
> 76128 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate 
> slot aec00279d7404b26a104ee906695d27a.
> 76128 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate 
> slot dde6780a1f8df3d0b1b1b454e28f8566.
> 76128 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (4/4) (639cf6da5847c7f4250839aeb2552df9) switched 
> from CREATED to DEPLOYING.
> 76128 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
> Custom Source -> Sink: Unnamed (4/4) (attempt #8739) with attempt id 
> 639cf6da5847c7f4250839aeb2552df9 to 6cf04c09-5378-4bf3-aedd-e9a52076ec99 @ 
> localhost (dataPort=-1) with allocation id 3315697ecf20a1249d7dad268892bcc9
> 76128 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Found 
> existing local state store for 4b5f437c7c47c8be9f8d8bf08e78910a - 
> cbc357ccb763df2852fee8c4fc7d55f2 - 2 under allocation id 
> dde6780a1f8df3d0b1b1b454e28f8566: 
> org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@6d90093b
> 76128 [flink-akka.actor.default-dispatcher-4] DEBUG 
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Found 
> existing local state store for 4b5f437c7c47c8be9f8d8bf08e78910a - 
> cbc357ccb763df2852fee8c4fc7d55f2 - 0 under allocation id 
> aec00279d7404b26a104ee906695d27a: 
> org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@64e96e04
> 76128 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: 
> Custom Source -> Sink: Unnamed (3/4)#8739 (497dd733deeb255e05bae82f0e41527d), 
> deploy into slot with allocation id dde6780a1f8df3d0b1b1b454e28f8566.
> 76129 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate 
> slot 3315697ecf20a1249d7dad268892bcc9.
> 76129 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: 
> Custom Source -> Sink: Unnamed (1/4)#8740 (9132ba5f6b087654fb351138ce74e710), 
> deploy into slot with allocation id aec00279d7404b26a104ee906695d27a.
> 76129 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (3/4)#8739 (497dd733deeb255e05bae82f0e41527d) switched from CREATED 
> to DEPLOYING.
> 76129 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Creating FileSystem stream 
> leak safety net for task Source: Custom Source -> Sink: Unnamed (3/4)#8739 
> (497dd733deeb255e05bae82f0e41527d) [DEPLOYING]
> 76129 [flink-akka.actor.default-dispatcher-2] DEBUG 
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Found 
> existing local state store for 4b5f437c7c47c8be9f8d8bf08e78910a - 
> cbc357ccb763df2852fee8c4fc7d55f2 - 3 under allocation id 
> 3315697ecf20a1249d7dad268892bcc9: 
> org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@23a8b928
> 76130 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (1/4)#8740 (9132ba5f6b087654fb351138ce74e710) switched from CREATED 
> to DEPLOYING.
> 76130 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Creating FileSystem stream 
> leak safety net for task Source: Custom Source -> Sink: Unnamed (1/4)#8740 
> (9132ba5f6b087654fb351138ce74e710) [DEPLOYING]
> 76130 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate 
> slot 25dd5bd3007772fe2cc69568cad2d882.
> 76130 [TransientBlobCache shutdown hook] INFO  
> org.apache.flink.runtime.blob.TransientBlobCache [] - Shutting down BLOB cache
> 76130 [TaskExecutorLocalStateStoresManager shutdown hook] INFO  
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - 
> Shutting down TaskExecutorLocalStateStoresManager.
> 76130 [flink-akka.actor.default-dispatcher-4] DEBUG 
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Found 
> existing local state store for 4b5f437c7c47c8be9f8d8bf08e78910a - 
> cbc357ccb763df2852fee8c4fc7d55f2 - 1 under allocation id 
> 25dd5bd3007772fe2cc69568cad2d882: 
> org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@389f2d5c
> 76130 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: 
> Custom Source -> Sink: Unnamed (2/4)#8740 (7384c32213a4e9cd3aa6ee5875b1e532), 
> deploy into slot with allocation id 25dd5bd3007772fe2cc69568cad2d882.
> 76130 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: 
> Custom Source -> Sink: Unnamed (4/4)#8739 (639cf6da5847c7f4250839aeb2552df9), 
> deploy into slot with allocation id 3315697ecf20a1249d7dad268892bcc9.
> 76131 [PermanentBlobCache shutdown hook] INFO  
> org.apache.flink.runtime.blob.PermanentBlobCache [] - Shutting down BLOB cache
> 76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (4/4)#8739 (639cf6da5847c7f4250839aeb2552df9) switched from CREATED 
> to DEPLOYING.
> 76131 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task 
> Source: Custom Source -> Sink: Unnamed (3/4)#8739 
> (497dd733deeb255e05bae82f0e41527d) [DEPLOYING].
> 76131 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task 
> Source: Custom Source -> Sink: Unnamed (1/4)#8740 
> (9132ba5f6b087654fb351138ce74e710) [DEPLOYING].
> 76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Creating FileSystem stream 
> leak safety net for task Source: Custom Source -> Sink: Unnamed (4/4)#8739 
> (639cf6da5847c7f4250839aeb2552df9) [DEPLOYING]
> 76131 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Getting user code class loader 
> for task 497dd733deeb255e05bae82f0e41527d at library cache manager took 0 
> milliseconds
> 76131 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Getting user code class loader 
> for task 9132ba5f6b087654fb351138ce74e710 at library cache manager took 0 
> milliseconds
> 76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task 
> Source: Custom Source -> Sink: Unnamed (4/4)#8739 
> (639cf6da5847c7f4250839aeb2552df9) [DEPLOYING].
> 76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Getting user code class loader 
> for task 639cf6da5847c7f4250839aeb2552df9 at library cache manager took 0 
> milliseconds
> 76131 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Registering task at network: 
> Source: Custom Source -> Sink: Unnamed (1/4)#8740 
> (9132ba5f6b087654fb351138ce74e710) [DEPLOYING].
> 76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Registering task at network: 
> Source: Custom Source -> Sink: Unnamed (4/4)#8739 
> (639cf6da5847c7f4250839aeb2552df9) [DEPLOYING].
> 76131 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Registering task at network: 
> Source: Custom Source -> Sink: Unnamed (3/4)#8739 
> (497dd733deeb255e05bae82f0e41527d) [DEPLOYING].
> 76131 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has 
> been configured, using default (HashMap) 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@7b20c610
> 76131 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has 
> been configured, using default (HashMap) 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@252878a9
> 76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has 
> been configured, using default (HashMap) 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@30eb707b
> 76131 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - The configuration 
> state.checkpoint-storage has not be set in the current sessions 
> flink-conf.yaml. Falling back to a default CheckpointStorage type. Users are 
> strongly encouraged explicitly set this configuration so they understand how 
> their applications are checkpointing snapshots for fault-tolerance.
> 76131 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage 
> is set to JobManager
> 76131 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - The configuration 
> state.checkpoint-storage has not be set in the current sessions 
> flink-conf.yaml. Falling back to a default CheckpointStorage type. Users are 
> strongly encouraged explicitly set this configuration so they understand how 
> their applications are checkpointing snapshots for fault-tolerance.
> 76131 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage 
> is set to JobManager
> 76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - The configuration 
> state.checkpoint-storage has not be set in the current sessions 
> flink-conf.yaml. Falling back to a default CheckpointStorage type. Users are 
> strongly encouraged explicitly set this configuration so they understand how 
> their applications are checkpointing snapshots for fault-tolerance.
> 76131 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (2/4)#8740 (7384c32213a4e9cd3aa6ee5875b1e532) switched from CREATED 
> to DEPLOYING.
> 76131 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage 
> is set to JobManager
> 76131 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Creating FileSystem stream 
> leak safety net for task Source: Custom Source -> Sink: Unnamed (2/4)#8740 
> (7384c32213a4e9cd3aa6ee5875b1e532) [DEPLOYING]
> 76131 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task 
> Source: Custom Source -> Sink: Unnamed (2/4)#8740 
> (7384c32213a4e9cd3aa6ee5875b1e532) [DEPLOYING].
> 76131 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Getting user code class loader 
> for task 7384c32213a4e9cd3aa6ee5875b1e532 at library cache manager took 0 
> milliseconds
> 76132 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Registering task at network: 
> Source: Custom Source -> Sink: Unnamed (2/4)#8740 
> (7384c32213a4e9cd3aa6ee5875b1e532) [DEPLOYING].
> 76132 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has 
> been configured, using default (HashMap) 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@4c1406b8
> 76132 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - The configuration 
> state.checkpoint-storage has not be set in the current sessions 
> flink-conf.yaml. Falling back to a default CheckpointStorage type. Users are 
> strongly encouraged explicitly set this configuration so they understand how 
> their applications are checkpointing snapshots for fault-tolerance.
> 76132 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage 
> is set to JobManager
> 76134 [FileChannelManagerImpl-io shutdown hook] INFO  
> org.apache.flink.runtime.io.disk.FileChannelManagerImpl [] - 
> FileChannelManager removed spill file directory 
> /var/folders/bd/6xl5m4z90j9438dv5bxg2n180000gn/T/junit2858931828265752695/junit7422691420779266555/flink-io-12fb17cf-712c-45f8-ab94-32fc7f5b5571
> 76136 [TaskExecutorLocalStateStoresManager shutdown hook] INFO  
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - 
> Shutting down TaskExecutorLocalStateStoresManager.
> 76136 [IOManagerAsync shutdown hook] DEBUG 
> org.apache.flink.runtime.io.disk.iomanager.IOManager [] - Shutting down I/O 
> manager.
> 76136 [IOManagerAsync shutdown hook] DEBUG 
> org.apache.flink.runtime.io.disk.iomanager.IOManager [] - Shutting down I/O 
> manager.
> 76136 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (2/4)#8740 (7384c32213a4e9cd3aa6ee5875b1e532) switched from DEPLOYING 
> to RUNNING.
> 76136 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (4/4)#8739 (639cf6da5847c7f4250839aeb2552df9) switched from DEPLOYING 
> to RUNNING.
> 76136 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Initializing Source: 
> Custom Source -> Sink: Unnamed (4/4)#8739.
> 76136 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Initializing Source: 
> Custom Source -> Sink: Unnamed (2/4)#8740.
> 76136 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (1/4)#8740 (9132ba5f6b087654fb351138ce74e710) switched from DEPLOYING 
> to RUNNING.
> 76136 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (3/4)#8739 (497dd733deeb255e05bae82f0e41527d) switched from DEPLOYING 
> to RUNNING.
> 76136 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Initializing Source: 
> Custom Source -> Sink: Unnamed (1/4)#8740.
> 76136 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Initializing Source: 
> Custom Source -> Sink: Unnamed (3/4)#8739.
> 76136 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (2/4) (7384c32213a4e9cd3aa6ee5875b1e532) switched 
> from DEPLOYING to RUNNING.
> 76136 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (4/4) (639cf6da5847c7f4250839aeb2552df9) switched 
> from DEPLOYING to RUNNING.
> 76136 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (1/4) (9132ba5f6b087654fb351138ce74e710) switched 
> from DEPLOYING to RUNNING.
> 76136 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (3/4) (497dd733deeb255e05bae82f0e41527d) switched 
> from DEPLOYING to RUNNING.
> 76137 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Invoking Source: 
> Custom Source -> Sink: Unnamed (3/4)#8739
> 76137 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Invoking Source: 
> Custom Source -> Sink: Unnamed (4/4)#8739
> 76137 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Invoking Source: 
> Custom Source -> Sink: Unnamed (1/4)#8740
> 76137 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - 
> Creating operator state backend for 
> StreamSink_7df19f87deec5680128845fd9a6ca18d_(3/4) with empty state.
> 76137 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Invoking Source: 
> Custom Source -> Sink: Unnamed (2/4)#8740
> 76137 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - 
> Creating operator state backend for 
> StreamSink_7df19f87deec5680128845fd9a6ca18d_(1/4) with empty state.
> 76137 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - 
> Creating operator state backend for 
> StreamSink_7df19f87deec5680128845fd9a6ca18d_(4/4) with empty state.
> 76137 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - 
> Creating operator state backend for 
> StreamSink_7df19f87deec5680128845fd9a6ca18d_(2/4) with empty state.
> 76137 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] WARN  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (3/4)#8739 (497dd733deeb255e05bae82f0e41527d) switched from RUNNING 
> to FAILED with failure cause: java.lang.RuntimeException: Test error. More 
> instances than expected.
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$InstanceTracker.reportNewInstance(ReactiveModeITCase.java:228)
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$ParallelismTrackingSink.open(ReactiveModeITCase.java:215)
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:550)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:540)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:580)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:760)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>       at java.lang.Thread.run(Thread.java:748)
> 76137 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] WARN  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (4/4)#8739 (639cf6da5847c7f4250839aeb2552df9) switched from RUNNING 
> to FAILED with failure cause: java.lang.RuntimeException: Test error. More 
> instances than expected.
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$InstanceTracker.reportNewInstance(ReactiveModeITCase.java:228)
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$ParallelismTrackingSink.open(ReactiveModeITCase.java:215)
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:550)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:540)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:580)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:760)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>       at java.lang.Thread.run(Thread.java:748)
> 76137 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] WARN  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (1/4)#8740 (9132ba5f6b087654fb351138ce74e710) switched from RUNNING 
> to FAILED with failure cause: java.lang.RuntimeException: Test error. More 
> instances than expected.
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$InstanceTracker.reportNewInstance(ReactiveModeITCase.java:228)
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$ParallelismTrackingSink.open(ReactiveModeITCase.java:215)
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:550)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:540)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:580)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:760)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>       at java.lang.Thread.run(Thread.java:748)
> 76137 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for 
> Source: Custom Source -> Sink: Unnamed (4/4)#8739 
> (639cf6da5847c7f4250839aeb2552df9).
> 76137 [Source: Custom Source -> Sink: Unnamed (4/4)#8739] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Release task Source: Custom 
> Source -> Sink: Unnamed (4/4)#8739 network resources (state: FAILED).
> 76137 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for 
> Source: Custom Source -> Sink: Unnamed (1/4)#8740 
> (9132ba5f6b087654fb351138ce74e710).
> 76137 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for 
> Source: Custom Source -> Sink: Unnamed (3/4)#8739 
> (497dd733deeb255e05bae82f0e41527d).
> 76137 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Release task Source: Custom 
> Source -> Sink: Unnamed (1/4)#8740 network resources (state: FAILED).
> 76137 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] WARN  
> org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: 
> Unnamed (2/4)#8740 (7384c32213a4e9cd3aa6ee5875b1e532) switched from RUNNING 
> to FAILED with failure cause: java.lang.RuntimeException: Test error. More 
> instances than expected.
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$InstanceTracker.reportNewInstance(ReactiveModeITCase.java:228)
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$ParallelismTrackingSink.open(ReactiveModeITCase.java:215)
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:550)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:540)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:580)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:760)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>       at java.lang.Thread.run(Thread.java:748)
> 76137 [Source: Custom Source -> Sink: Unnamed (3/4)#8739] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Release task Source: Custom 
> Source -> Sink: Unnamed (3/4)#8739 network resources (state: FAILED).
> 76137 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] INFO  
> org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for 
> Source: Custom Source -> Sink: Unnamed (2/4)#8740 
> (7384c32213a4e9cd3aa6ee5875b1e532).
> 76137 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Release task Source: Custom 
> Source -> Sink: Unnamed (2/4)#8740 network resources (state: FAILED).
> 76137 [FileCache shutdown hook] INFO  
> org.apache.flink.runtime.filecache.FileCache [] - removed file cache 
> directory 
> /var/folders/bd/6xl5m4z90j9438dv5bxg2n180000gn/T/junit2858931828265752695/junit7422691420779266555/flink-dist-cache-13391a9e-181b-4d3c-b373-0ac0203f301e
> 76137 [Source: Custom Source -> Sink: Unnamed (1/4)#8740] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Ensuring all FileSystem 
> streams are closed for task Source: Custom Source -> Sink: Unnamed (1/4)#8740 
> (9132ba5f6b087654fb351138ce74e710) [FAILED]
> 76138 [Source: Custom Source -> Sink: Unnamed (2/4)#8740] DEBUG 
> org.apache.flink.runtime.taskmanager.Task [] - Ensuring all FileSystem 
> streams are closed for task Source: Custom Source -> Sink: Unnamed (2/4)#8740 
> (7384c32213a4e9cd3aa6ee5875b1e532) [FAILED]
> 76138 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task 
> and sending final execution state FAILED to JobManager for task Source: 
> Custom Source -> Sink: Unnamed (1/4)#8740 9132ba5f6b087654fb351138ce74e710.
> 76138 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task 
> and sending final execution state FAILED to JobManager for task Source: 
> Custom Source -> Sink: Unnamed (2/4)#8740 7384c32213a4e9cd3aa6ee5875b1e532.
> 76138 [flink-akka.actor.default-dispatcher-2] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
> Source -> Sink: Unnamed (1/4) (9132ba5f6b087654fb351138ce74e710) switched 
> from RUNNING to FAILED on 3ebdf185-dcde-4ad2-b567-2f14c1b86fd1 @ localhost 
> (dataPort=-1).
> java.lang.RuntimeException: Test error. More instances than expected.
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$InstanceTracker.reportNewInstance(ReactiveModeITCase.java:228)
>  ~[test-classes/:?]
>       at 
> org.apache.flink.test.scheduling.ReactiveModeITCase$ParallelismTrackingSink.open(ReactiveModeITCase.java:215)
>  ~[test-classes/:?]
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) 
> ~[classes/:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:550)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:540)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:580)
>  ~[classes/:?]
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:760) 
> ~[classes/:?]
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
> ~[classes/:?]
>       at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to