[ 
https://issues.apache.org/jira/browse/FLINK-7487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-7487:
-------------------------------
    Labels: test-stability  (was: )

> test instability in ClassLoaderITCase (no resources available)
> --------------------------------------------------------------
>
>                 Key: FLINK-7487
>                 URL: https://issues.apache.org/jira/browse/FLINK-7487
>             Project: Flink
>          Issue Type: Bug
>          Components: Tests
>    Affects Versions: 1.4.0
>            Reporter: Nico Kruber
>              Labels: test-stability
>
> This is the stack trace from https://travis-ci.org/NicoK/flink/jobs/266772103 
> which contains quite some changes but the error itself should be unrelated:
> {code}
> testKMeansJobWithCustomClassLoader(org.apache.flink.test.classloading.ClassLoaderITCase)
>   Time elapsed: 0.604 sec  <<< ERROR!
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
>       at 
> org.apache.flink.test.classloading.ClassLoaderITCase.testKMeansJobWithCustomClassLoader(ClassLoaderITCase.java:232)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:930)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Not enough free slots available to run the job. You can decrease the operator 
> parallelism or increase the number of slots per TaskManager in the 
> configuration. Task to schedule: < Attempt #0 (Map (Map at 
> main(KMeansForTest.java:67)) (4/4)) @ (unassigned) - [SCHEDULED] > with 
> groupID < 704e8c44f1c3edc91e03431408eb561d > in sharing group < 
> SlotSharingGroup [727c589bfbe7c65aa4ffc75585a1e7e7, 
> f82d7994fbfdd0aecab2c7f54e58f0c1, 62039db00aa28f9de4fa3df3b89fbc7d, 
> 704e8c44f1c3edc91e03431408eb561d, 208a859a78f987562b4e8dcad6e90582, 
> 9b9f002f990306532d6f153b38835b6f, 30f3d92eacc3068d3545693fe084a6b8, 
> 74da3f65164120b4781de360723e60c0] >. Resources available to scheduler: Number 
> of instances=2, total number of slots=4, available slots=0
>       at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:261)
>       at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:138)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.allocateSlotForExecution(Execution.java:362)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:304)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:596)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleOrUpdateConsumers$4(Execution.java:567)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> It seems that the job started in `testDisposeSavepointWithCustomKvState` is 
> not properly shut down after the test method exits and (parts of) it remain 
> and block resources for following tests. Copying the relevant parts of the 
> log here:
> {code}
> 13:46:30,887 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - 
> ================================================================================
> Test 
> testDisposeSavepointWithCustomKvState(org.apache.flink.test.classloading.ClassLoaderITCase)
>  is running.
> --------------------------------------------------------------------------------
> 13:46:30,891 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Starting program invoke thread
> 13:46:30,907 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Waiting for job status running.
> 13:46:30,960 INFO  org.apache.flink.runtime.client.JobClient                  
>    - Starting JobClient actor system
> 13:46:31,032 INFO  akka.event.slf4j.Slf4jLogger                               
>    - Slf4jLogger started
> 13:46:31,039 INFO  Remoting                                                   
>    - Starting remoting
> 13:46:31,099 INFO  org.apache.flink.runtime.client.JobClient                  
>    - Started JobClient actor system at 127.0.0.1:37545
> 13:46:31,100 INFO  Remoting                                                   
>    - Remoting started; listening on addresses 
> :[akka.tcp://flink@localhost:37545]
> 13:46:31,101 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Received SubmitJobAndWait(JobGraph(jobId: 
> 9d35089c4504acc95906d5300dbd4031)) but there is no connection to a JobManager 
> yet.
> 13:46:31,101 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Received job Flink Streaming Job (9d35089c4504acc95906d5300dbd4031).
> 13:46:31,101 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Disconnect from JobManager null.
> 13:46:31,125 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Connect to JobManager 
> Actor[akka.tcp://flink@localhost:36694/user/jobmanager#-1434086720].
> 13:46:31,126 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Connected to JobManager at 
> Actor[akka.tcp://flink@localhost:36694/user/jobmanager#-1434086720] with 
> leader session id 749f16cd-6da4-4197-8e7e-8f6c13db4903.
> 13:46:31,126 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Sending message to JobManager 
> akka.tcp://flink@localhost:36694/user/jobmanager to submit job Flink 
> Streaming Job (9d35089c4504acc95906d5300dbd4031) and wait for progress
> 13:46:31,126 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Upload jar files to job manager 
> akka.tcp://flink@localhost:36694/user/jobmanager.
> 13:46:31,134 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Submit job to the job manager 
> akka.tcp://flink@localhost:36694/user/jobmanager.
> 13:46:31,137 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager    
>    - Submitting job 9d35089c4504acc95906d5300dbd4031 (Flink Streaming Job).
> 13:46:31,138 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager    
>    - Using restart strategy 
> FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, 
> delayBetweenRestartAttempts=10000) for 9d35089c4504acc95906d5300dbd4031.
> 13:46:31,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Job recovers via failover strategy: full graph restart
> 13:46:31,138 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager    
>    - Running initialization on master for job Flink Streaming Job 
> (9d35089c4504acc95906d5300dbd4031).
> 13:46:31,138 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager    
>    - Successfully ran initialization on master in 0 ms.
> 13:46:31,139 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager    
>    - Using application-defined state backend for checkpoint/savepoint 
> metadata: File State Backend @ 
> file:/tmp/junit6916765847938934377/junit165943353187075575.
> 13:46:31,139 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager    
>    - Scheduling job 9d35089c4504acc95906d5300dbd4031 (Flink Streaming Job).
> 13:46:31,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Job Flink Streaming Job (9d35089c4504acc95906d5300dbd4031) switched from 
> state CREATED to RUNNING.
> 13:46:31,145 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Job 9d35089c4504acc95906d5300dbd4031 was successfully submitted to the 
> JobManager akka://flink/deadLetters.
> 13:46:31,145 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Source: Custom Source -> Map (1/4) (96712295b7e1614df1540a735cb62940) 
> switched from CREATED to SCHEDULED.
> 13:46:31,145 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Source: Custom Source -> Map (2/4) (50c1bb7092b55ea8292c88e0180d0cc5) 
> switched from CREATED to SCHEDULED.
> 13:46:31,145 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Source: Custom Source -> Map (3/4) (8e2e046358c47844fa90f6aae8ff6f0a) 
> switched from CREATED to SCHEDULED.
> 13:46:31,145 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Source: Custom Source -> Map (4/4) (934e3cc3472bad094088cfd88452f8ec) 
> switched from CREATED to SCHEDULED.
> 13:46:31,145 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (1/4) (cdf8ad6634e1b42a8e6c0dc6303ee999) 
> switched from CREATED to SCHEDULED.
> 13:46:31,145 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (2/4) (996f597fe38c43e66ad1d21e595f04ee) 
> switched from CREATED to SCHEDULED.
> 13:46:31,145 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (3/4) (0073828f2a22bb4853cc63c3e4e93732) 
> switched from CREATED to SCHEDULED.
> 13:46:31,146 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (4/4) (cc54a39b19e3484b927aff66ea4bafb7) 
> switched from CREATED to SCHEDULED.
> 13:46:31,146 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Source: Custom Source -> Map (1/4) (96712295b7e1614df1540a735cb62940) 
> switched from SCHEDULED to DEPLOYING.
> 13:46:31,146 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Deploying Source: Custom Source -> Map (1/4) (attempt #0) to localhost
> 13:46:31,146 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Source: Custom Source -> Map (2/4) (50c1bb7092b55ea8292c88e0180d0cc5) 
> switched from SCHEDULED to DEPLOYING.
> 13:46:31,147 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Deploying Source: Custom Source -> Map (2/4) (attempt #0) to localhost
> 13:46:31,147 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Source: Custom Source -> Map (3/4) (8e2e046358c47844fa90f6aae8ff6f0a) 
> switched from SCHEDULED to DEPLOYING.
> 13:46:31,147 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Deploying Source: Custom Source -> Map (3/4) (attempt #0) to localhost
> 13:46:31,147 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Source: Custom Source -> Map (4/4) (934e3cc3472bad094088cfd88452f8ec) 
> switched from SCHEDULED to DEPLOYING.
> 13:46:31,147 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Deploying Source: Custom Source -> Map (4/4) (attempt #0) to localhost
> 13:46:31,147 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (1/4) (cdf8ad6634e1b42a8e6c0dc6303ee999) 
> switched from SCHEDULED to DEPLOYING.
> 13:46:31,147 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Deploying Flat Map -> Sink: Unnamed (1/4) (attempt #0) to localhost
> 13:46:31,147 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (2/4) (996f597fe38c43e66ad1d21e595f04ee) 
> switched from SCHEDULED to DEPLOYING.
> 13:46:31,152 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Deploying Flat Map -> Sink: Unnamed (2/4) (attempt #0) to localhost
> 13:46:31,152 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Received task Source: Custom Source -> Map (1/4)
> 13:46:31,152 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Job running. ID: 9d35089c4504acc95906d5300dbd4031
> 13:46:31,152 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Wait for all tasks to be running.
> 13:46:31,151 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Received task Source: Custom Source -> Map (2/4)
> 13:46:31,151 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Job execution switched to status RUNNING.
> 13:46:31,157 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Source: Custom Source -> Map(1/4) switched to 
> SCHEDULED 
> 13:46:31,157 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Source: Custom Source -> Map(2/4) switched to 
> SCHEDULED 
> 13:46:31,157 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Source: Custom Source -> Map(3/4) switched to 
> SCHEDULED 
> 13:46:31,157 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Source: Custom Source -> Map(4/4) switched to 
> SCHEDULED 
> 13:46:31,157 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Flat Map -> Sink: Unnamed(1/4) switched to 
> SCHEDULED 
> 13:46:31,157 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Flat Map -> Sink: Unnamed(2/4) switched to 
> SCHEDULED 
> 13:46:31,157 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Flat Map -> Sink: Unnamed(3/4) switched to 
> SCHEDULED 
> 13:46:31,157 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Flat Map -> Sink: Unnamed(4/4) switched to 
> SCHEDULED 
> 13:46:31,157 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Source: Custom Source -> Map(1/4) switched to 
> DEPLOYING 
> 13:46:31,157 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Source: Custom Source -> Map(2/4) switched to 
> DEPLOYING 
> 13:46:31,157 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Source: Custom Source -> Map(3/4) switched to 
> DEPLOYING 
> 13:46:31,157 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Source: Custom Source -> Map(4/4) switched to 
> DEPLOYING 
> 13:46:31,157 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Flat Map -> Sink: Unnamed(1/4) switched to 
> DEPLOYING 
> 13:46:31,158 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Flat Map -> Sink: Unnamed(2/4) switched to 
> DEPLOYING 
> 13:46:31,152 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (3/4) (0073828f2a22bb4853cc63c3e4e93732) 
> switched from SCHEDULED to DEPLOYING.
> 13:46:31,158 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Deploying Flat Map -> Sink: Unnamed (3/4) (attempt #0) to localhost
> 13:46:31,158 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (4/4) (cc54a39b19e3484b927aff66ea4bafb7) 
> switched from SCHEDULED to DEPLOYING.
> 13:46:31,158 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Deploying Flat Map -> Sink: Unnamed (4/4) (attempt #0) to localhost
> 13:46:31,160 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Flat Map -> Sink: Unnamed(3/4) switched to 
> DEPLOYING 
> 13:46:31,161 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Flat Map -> Sink: Unnamed(4/4) switched to 
> DEPLOYING 
> 13:46:31,164 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Source: Custom Source -> Map (1/4) (96712295b7e1614df1540a735cb62940) 
> switched from CREATED to DEPLOYING.
> 13:46:31,164 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Creating FileSystem stream leak safety net for task Source: Custom 
> Source -> Map (1/4) (96712295b7e1614df1540a735cb62940) [DEPLOYING]
> 13:46:31,172 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Received task Source: Custom Source -> Map (3/4)
> 13:46:31,173 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Source: Custom Source -> Map (2/4) (50c1bb7092b55ea8292c88e0180d0cc5) 
> switched from CREATED to DEPLOYING.
> 13:46:31,173 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Creating FileSystem stream leak safety net for task Source: Custom 
> Source -> Map (2/4) (50c1bb7092b55ea8292c88e0180d0cc5) [DEPLOYING]
> 13:46:31,174 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Received task Flat Map -> Sink: Unnamed (1/4)
> 13:46:31,174 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Source: Custom Source -> Map (3/4) (8e2e046358c47844fa90f6aae8ff6f0a) 
> switched from CREATED to DEPLOYING.
> 13:46:31,174 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Creating FileSystem stream leak safety net for task Source: Custom 
> Source -> Map (3/4) (8e2e046358c47844fa90f6aae8ff6f0a) [DEPLOYING]
> 13:46:31,176 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Received task Flat Map -> Sink: Unnamed (2/4)
> 13:46:31,177 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Flat Map -> Sink: Unnamed (1/4) (cdf8ad6634e1b42a8e6c0dc6303ee999) 
> switched from CREATED to DEPLOYING.
> 13:46:31,177 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Creating FileSystem stream leak safety net for task Flat Map -> Sink: 
> Unnamed (1/4) (cdf8ad6634e1b42a8e6c0dc6303ee999) [DEPLOYING]
> 13:46:31,182 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Received task Source: Custom Source -> Map (4/4)
> 13:46:31,182 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Flat Map -> Sink: Unnamed (2/4) (996f597fe38c43e66ad1d21e595f04ee) 
> switched from CREATED to DEPLOYING.
> 13:46:31,182 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Creating FileSystem stream leak safety net for task Flat Map -> Sink: 
> Unnamed (2/4) (996f597fe38c43e66ad1d21e595f04ee) [DEPLOYING]
> 13:46:31,183 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Received task Flat Map -> Sink: Unnamed (3/4)
> 13:46:31,184 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Source: Custom Source -> Map (4/4) (934e3cc3472bad094088cfd88452f8ec) 
> switched from CREATED to DEPLOYING.
> 13:46:31,184 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Creating FileSystem stream leak safety net for task Source: Custom 
> Source -> Map (4/4) (934e3cc3472bad094088cfd88452f8ec) [DEPLOYING]
> 13:46:31,187 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Flat Map -> Sink: Unnamed (3/4) (0073828f2a22bb4853cc63c3e4e93732) 
> switched from CREATED to DEPLOYING.
> 13:46:31,187 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Creating FileSystem stream leak safety net for task Flat Map -> Sink: 
> Unnamed (3/4) (0073828f2a22bb4853cc63c3e4e93732) [DEPLOYING]
> 13:46:31,192 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Loading JAR files for task Source: Custom Source -> Map (1/4) 
> (96712295b7e1614df1540a735cb62940) [DEPLOYING].
> 13:46:31,192 INFO  org.apache.flink.runtime.blob.BlobClient                   
>    - Downloading 
> 9d35089c4504acc95906d5300dbd4031/1b1103f578e2bb36bd3e1d1c72f1174b90260ea4-9ba2618a66895ccd07df0e95c7c74cc1
>  from localhost/127.0.0.1:38845
> 13:46:31,193 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Received task Flat Map -> Sink: Unnamed (4/4)
> 13:46:31,193 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Loading JAR files for task Source: Custom Source -> Map (4/4) 
> (934e3cc3472bad094088cfd88452f8ec) [DEPLOYING].
> 13:46:31,193 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Loading JAR files for task Flat Map -> Sink: Unnamed (2/4) 
> (996f597fe38c43e66ad1d21e595f04ee) [DEPLOYING].
> 13:46:31,193 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Loading JAR files for task Flat Map -> Sink: Unnamed (3/4) 
> (0073828f2a22bb4853cc63c3e4e93732) [DEPLOYING].
> 13:46:31,194 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Loading JAR files for task Flat Map -> Sink: Unnamed (1/4) 
> (cdf8ad6634e1b42a8e6c0dc6303ee999) [DEPLOYING].
> 13:46:31,194 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Loading JAR files for task Source: Custom Source -> Map (3/4) 
> (8e2e046358c47844fa90f6aae8ff6f0a) [DEPLOYING].
> 13:46:31,193 INFO  org.apache.flink.runtime.blob.BlobClient                   
>    - Downloading 
> 9d35089c4504acc95906d5300dbd4031/1b1103f578e2bb36bd3e1d1c72f1174b90260ea4-9ba2618a66895ccd07df0e95c7c74cc1
>  from localhost/127.0.0.1:38845
> 13:46:31,194 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Flat Map -> Sink: Unnamed (4/4) (cc54a39b19e3484b927aff66ea4bafb7) 
> switched from CREATED to DEPLOYING.
> 13:46:31,194 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Creating FileSystem stream leak safety net for task Flat Map -> Sink: 
> Unnamed (4/4) (cc54a39b19e3484b927aff66ea4bafb7) [DEPLOYING]
> 13:46:31,194 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Loading JAR files for task Flat Map -> Sink: Unnamed (4/4) 
> (cc54a39b19e3484b927aff66ea4bafb7) [DEPLOYING].
> 13:46:31,194 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Loading JAR files for task Source: Custom Source -> Map (2/4) 
> (50c1bb7092b55ea8292c88e0180d0cc5) [DEPLOYING].
> 13:46:31,202 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Registering task at network: Source: Custom Source -> Map (1/4) 
> (96712295b7e1614df1540a735cb62940) [DEPLOYING].
> 13:46:31,202 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Registering task at network: Source: Custom Source -> Map (3/4) 
> (8e2e046358c47844fa90f6aae8ff6f0a) [DEPLOYING].
> 13:46:31,202 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Registering task at network: Flat Map -> Sink: Unnamed (1/4) 
> (cdf8ad6634e1b42a8e6c0dc6303ee999) [DEPLOYING].
> 13:46:31,204 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Registering task at network: Source: Custom Source -> Map (4/4) 
> (934e3cc3472bad094088cfd88452f8ec) [DEPLOYING].
> 13:46:31,204 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Registering task at network: Flat Map -> Sink: Unnamed (4/4) 
> (cc54a39b19e3484b927aff66ea4bafb7) [DEPLOYING].
> 13:46:31,204 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Registering task at network: Flat Map -> Sink: Unnamed (2/4) 
> (996f597fe38c43e66ad1d21e595f04ee) [DEPLOYING].
> 13:46:31,205 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Registering task at network: Source: Custom Source -> Map (2/4) 
> (50c1bb7092b55ea8292c88e0180d0cc5) [DEPLOYING].
> 13:46:31,205 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Source: Custom Source -> Map (1/4) (96712295b7e1614df1540a735cb62940) 
> switched from DEPLOYING to RUNNING.
> 13:46:31,205 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Registering task at network: Flat Map -> Sink: Unnamed (3/4) 
> (0073828f2a22bb4853cc63c3e4e93732) [DEPLOYING].
> 13:46:31,205 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask        
>    - Using user-defined state backend: File State Backend @ 
> file:/tmp/junit6916765847938934377/junit165943353187075575.
> 13:46:31,206 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Source: Custom Source -> Map (3/4) (8e2e046358c47844fa90f6aae8ff6f0a) 
> switched from DEPLOYING to RUNNING.
> 13:46:31,206 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Flat Map -> Sink: Unnamed (1/4) (cdf8ad6634e1b42a8e6c0dc6303ee999) 
> switched from DEPLOYING to RUNNING.
> 13:46:31,207 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask        
>    - Using user-defined state backend: File State Backend @ 
> file:/tmp/junit6916765847938934377/junit165943353187075575.
> 13:46:31,208 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask        
>    - Using user-defined state backend: File State Backend @ 
> file:/tmp/junit6916765847938934377/junit165943353187075575.
> 13:46:31,208 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Source: Custom Source -> Map (4/4) (934e3cc3472bad094088cfd88452f8ec) 
> switched from DEPLOYING to RUNNING.
> 13:46:31,208 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask        
>    - Using user-defined state backend: File State Backend @ 
> file:/tmp/junit6916765847938934377/junit165943353187075575.
> 13:46:31,208 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Source: Custom Source -> Map (1/4) (96712295b7e1614df1540a735cb62940) 
> switched from DEPLOYING to RUNNING.
> 13:46:31,209 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Source: Custom Source -> Map (3/4) (8e2e046358c47844fa90f6aae8ff6f0a) 
> switched from DEPLOYING to RUNNING.
> 13:46:31,209 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (1/4) (cdf8ad6634e1b42a8e6c0dc6303ee999) 
> switched from DEPLOYING to RUNNING.
> 13:46:31,211 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Flat Map -> Sink: Unnamed (4/4) (cc54a39b19e3484b927aff66ea4bafb7) 
> switched from DEPLOYING to RUNNING.
> 13:46:31,211 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask        
>    - Using user-defined state backend: File State Backend @ 
> file:/tmp/junit6916765847938934377/junit165943353187075575.
> 13:46:31,212 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Flat Map -> Sink: Unnamed (2/4) (996f597fe38c43e66ad1d21e595f04ee) 
> switched from DEPLOYING to RUNNING.
> 13:46:31,212 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask        
>    - Using user-defined state backend: File State Backend @ 
> file:/tmp/junit6916765847938934377/junit165943353187075575.
> 13:46:31,213 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Source: Custom Source -> Map (2/4) (50c1bb7092b55ea8292c88e0180d0cc5) 
> switched from DEPLOYING to RUNNING.
> 13:46:31,213 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask        
>    - Using user-defined state backend: File State Backend @ 
> file:/tmp/junit6916765847938934377/junit165943353187075575.
> 13:46:31,213 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Source: Custom Source -> Map(1/4) switched to 
> RUNNING 
> 13:46:31,213 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Source: Custom Source -> Map(3/4) switched to 
> RUNNING 
> 13:46:31,213 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Flat Map -> Sink: Unnamed(1/4) switched to 
> RUNNING 
> 13:46:31,220 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Flat Map -> Sink: Unnamed (3/4) (0073828f2a22bb4853cc63c3e4e93732) 
> switched from DEPLOYING to RUNNING.
> 13:46:31,221 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask        
>    - Using user-defined state backend: File State Backend @ 
> file:/tmp/junit6916765847938934377/junit165943353187075575.
> 13:46:31,221 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (2/4) (996f597fe38c43e66ad1d21e595f04ee) 
> switched from DEPLOYING to RUNNING.
> 13:46:31,221 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Source: Custom Source -> Map (4/4) (934e3cc3472bad094088cfd88452f8ec) 
> switched from DEPLOYING to RUNNING.
> 13:46:31,221 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (4/4) (cc54a39b19e3484b927aff66ea4bafb7) 
> switched from DEPLOYING to RUNNING.
> 13:46:31,221 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Source: Custom Source -> Map (2/4) (50c1bb7092b55ea8292c88e0180d0cc5) 
> switched from DEPLOYING to RUNNING.
> 13:46:31,225 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Flat Map -> Sink: Unnamed(2/4) switched to 
> RUNNING 
> 13:46:31,225 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Source: Custom Source -> Map(4/4) switched to 
> RUNNING 
> 13:46:31,225 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Flat Map -> Sink: Unnamed(4/4) switched to 
> RUNNING 
> 13:46:31,226 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Source: Custom Source -> Map(2/4) switched to 
> RUNNING 
> 13:46:31,228 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (3/4) (0073828f2a22bb4853cc63c3e4e93732) 
> switched from DEPLOYING to RUNNING.
> 13:46:31,230 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:31        Flat Map -> Sink: Unnamed(3/4) switched to 
> RUNNING 
> 13:46:31,238 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Initializing heap keyed state backend with stream factory.
> 13:46:31,241 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Initializing heap keyed state backend with stream factory.
> 13:46:31,242 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Initializing heap keyed state backend with stream factory.
> 13:46:31,243 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Initializing heap keyed state backend with stream factory.
> 13:46:31,365 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - All tasks are running.
> 13:46:31,365 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (1/20).
> 13:46:31,378 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 1 @ 1503323191376
> 13:46:31,571 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-c800ee4b9956,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:31,615 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-c800ee4b9956,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:31,676 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-c800ee4b9956,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:31,679 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-c800ee4b9956,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:31,732 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-c800ee4b9956
> 13:46:31,732 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (2/20).
> 13:46:31,732 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 1 (6544 bytes in 348 ms).
> 13:46:31,742 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 2 @ 1503323191732
> 13:46:31,793 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-3fe0dc1c6f9f,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:31,795 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-3fe0dc1c6f9f,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:32,093 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-3fe0dc1c6f9f,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:32,160 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-3fe0dc1c6f9f,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:32,164 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-3fe0dc1c6f9f
> 13:46:32,165 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (3/20).
> 13:46:32,165 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 2 (6544 bytes in 432 ms).
> 13:46:32,168 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 3 @ 1503323192165
> 13:46:32,220 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-34b6c3f8cb0e,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:32,222 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-34b6c3f8cb0e,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:32,620 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-34b6c3f8cb0e,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:32,637 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-34b6c3f8cb0e,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:32,646 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-34b6c3f8cb0e
> 13:46:32,646 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (4/20).
> 13:46:32,647 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 3 (6544 bytes in 481 ms).
> 13:46:32,648 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 4 @ 1503323192646
> 13:46:32,675 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-1450072cd754,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:32,677 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-1450072cd754,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:33,063 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-1450072cd754,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:33,089 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-1450072cd754,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 5 ms.
> 13:46:33,096 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 4 (6544 bytes in 450 ms).
> 13:46:33,099 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-1450072cd754
> 13:46:33,099 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (5/20).
> 13:46:33,100 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 5 @ 1503323193099
> 13:46:33,119 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-d0cbbe1100b4,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:33,121 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-d0cbbe1100b4,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:33,483 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-d0cbbe1100b4,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 51 ms.
> 13:46:33,508 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-d0cbbe1100b4,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 3 ms.
> 13:46:33,519 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 5 (6544 bytes in 420 ms).
> 13:46:33,519 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-d0cbbe1100b4
> 13:46:33,519 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (6/20).
> 13:46:33,520 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 6 @ 1503323193519
> 13:46:33,562 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-4367fa0c7391,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:33,564 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-4367fa0c7391,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:33,892 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-4367fa0c7391,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 2 ms.
> 13:46:33,893 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-4367fa0c7391,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:33,899 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 6 (6544 bytes in 380 ms).
> 13:46:33,899 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-4367fa0c7391
> 13:46:33,899 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (7/20).
> 13:46:33,900 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 7 @ 1503323193899
> 13:46:33,914 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-03ac77b03ad3,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:33,915 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-03ac77b03ad3,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:34,298 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-03ac77b03ad3,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:34,358 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-03ac77b03ad3,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 8 ms.
> 13:46:34,361 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 7 (6544 bytes in 462 ms).
> 13:46:34,363 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-03ac77b03ad3
> 13:46:34,363 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (8/20).
> 13:46:34,366 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 8 @ 1503323194365
> 13:46:34,381 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-b0e995db01d8,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:34,382 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-b0e995db01d8,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:34,873 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-b0e995db01d8,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:34,935 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-b0e995db01d8,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:34,940 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 8 (6544 bytes in 575 ms).
> 13:46:34,941 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-b0e995db01d8
> 13:46:34,941 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (9/20).
> 13:46:34,944 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 9 @ 1503323194943
> 13:46:35,008 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-7f4047502898,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:35,009 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-7f4047502898,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:35,518 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-7f4047502898,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:35,545 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-7f4047502898,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:35,559 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 9 (6544 bytes in 615 ms).
> 13:46:35,559 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-7f4047502898
> 13:46:35,559 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (10/20).
> 13:46:35,559 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 10 @ 1503323195559
> 13:46:35,590 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-3aa3180897da,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:35,592 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-3aa3180897da,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:36,233 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-3aa3180897da,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:36,301 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-3aa3180897da,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 2 ms.
> 13:46:36,305 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 10 (6544 bytes in 746 ms).
> 13:46:36,305 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 11 @ 1503323196305
> 13:46:36,305 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-3aa3180897da
> 13:46:36,305 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (11/20).
> 13:46:36,317 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 12 @ 1503323196316
> 13:46:36,355 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit165943353187075575/9d35089c4504acc95906d5300dbd4031,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:36,356 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-ea0e04cd15bf,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:36,358 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit165943353187075575/9d35089c4504acc95906d5300dbd4031,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:36,359 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-ea0e04cd15bf,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:36,982 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit165943353187075575/9d35089c4504acc95906d5300dbd4031,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:37,000 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-ea0e04cd15bf,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:37,056 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit165943353187075575/9d35089c4504acc95906d5300dbd4031,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 7 ms.
> 13:46:37,058 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 11 (6544 bytes in 753 ms).
> 13:46:37,095 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-ea0e04cd15bf,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:37,110 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 12 (6544 bytes in 794 ms).
> 13:46:37,111 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-ea0e04cd15bf
> 13:46:37,111 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (12/20).
> 13:46:37,112 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 13 @ 1503323197111
> 13:46:37,341 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-cb72b0845f40,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:37,342 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-cb72b0845f40,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:37,740 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-cb72b0845f40,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:37,793 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-cb72b0845f40,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:37,797 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 13 (6544 bytes in 685 ms).
> 13:46:37,797 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-cb72b0845f40
> 13:46:37,797 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (13/20).
> 13:46:37,798 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 14 @ 1503323197798
> 13:46:37,819 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-205b213f0e0c,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:37,820 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-205b213f0e0c,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:38,379 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-205b213f0e0c,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:38,459 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-205b213f0e0c,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 3 ms.
> 13:46:38,463 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 14 (6544 bytes in 665 ms).
> 13:46:38,464 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-205b213f0e0c
> 13:46:38,464 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (14/20).
> 13:46:38,466 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 15 @ 1503323198466
> 13:46:38,494 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-d4e2742ba705,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:38,496 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-d4e2742ba705,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:39,067 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-d4e2742ba705,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:39,105 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-d4e2742ba705,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:39,112 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 15 (6544 bytes in 646 ms).
> 13:46:39,112 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-d4e2742ba705
> 13:46:39,112 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (15/20).
> 13:46:39,114 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 16 @ 1503323199114
> 13:46:39,132 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-88a07cc28daf,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:39,133 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-88a07cc28daf,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:39,692 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-88a07cc28daf,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 3 ms.
> 13:46:39,723 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-88a07cc28daf,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 4 ms.
> 13:46:39,728 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 16 (6544 bytes in 614 ms).
> 13:46:39,728 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-88a07cc28daf
> 13:46:39,728 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (16/20).
> 13:46:39,730 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 17 @ 1503323199730
> 13:46:39,767 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-5d3d6d80abb5,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:39,768 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-5d3d6d80abb5,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:40,273 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-5d3d6d80abb5,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:40,332 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-5d3d6d80abb5,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:40,338 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 17 (6544 bytes in 608 ms).
> 13:46:40,339 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-5d3d6d80abb5
> 13:46:40,339 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (17/20).
> 13:46:40,340 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 18 @ 1503323200340
> 13:46:40,395 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-827ebbb3bc50,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:40,396 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-827ebbb3bc50,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:41,098 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-827ebbb3bc50,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 4 ms.
> 13:46:41,129 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-827ebbb3bc50,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:41,132 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 18 (6544 bytes in 792 ms).
> 13:46:41,132 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-827ebbb3bc50
> 13:46:41,132 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (18/20).
> 13:46:41,133 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 19 @ 1503323201133
> 13:46:41,164 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-f4bd9dcd370e,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:41,165 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-f4bd9dcd370e,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 2 ms.
> 13:46:41,793 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-f4bd9dcd370e,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:41,828 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-f4bd9dcd370e,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:41,835 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 19 (6544 bytes in 702 ms).
> 13:46:41,835 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 20 @ 1503323201835
> 13:46:41,835 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-f4bd9dcd370e
> 13:46:41,835 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (19/20).
> 13:46:41,839 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 21 @ 1503323201835
> 13:46:41,859 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit165943353187075575/9d35089c4504acc95906d5300dbd4031,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:41,861 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-28b09f34cf89,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:41,862 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit165943353187075575/9d35089c4504acc95906d5300dbd4031,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:41,864 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-28b09f34cf89,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:42,342 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit165943353187075575/9d35089c4504acc95906d5300dbd4031,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:42,360 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-28b09f34cf89,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:42,413 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit165943353187075575/9d35089c4504acc95906d5300dbd4031,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 2 ms.
> 13:46:42,419 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 20 (6544 bytes in 583 ms).
> 13:46:42,468 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-28b09f34cf89,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 9 ms.
> 13:46:42,471 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 21 (6544 bytes in 636 ms).
> 13:46:42,471 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-28b09f34cf89
> 13:46:42,471 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggering savepoint (20/20).
> 13:46:42,472 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Triggering checkpoint 22 @ 1503323202471
> 13:46:42,597 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-2c11ba9942c0,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (1/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:42,599 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-2c11ba9942c0,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (2/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:43,121 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-2c11ba9942c0,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (4/4),5,Flink 
> Task Threads] took 0 ms.
> 13:46:43,210 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend  
>    - Heap backend snapshot (File Stream Factory @ 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-2c11ba9942c0,
>  synchronous part) in thread Thread[Flat Map -> Sink: Unnamed (3/4),5,Flink 
> Task Threads] took 1 ms.
> 13:46:43,215 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>    - Completed checkpoint 22 (6544 bytes in 743 ms).
> 13:46:43,215 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Triggered savepoint. Path: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-2c11ba9942c0
> 13:46:43,215 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Disposing savepoint at 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-2c11ba9942c0
> 13:46:43,217 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager    
>    - Disposing savepoint at 
> 'file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-2c11ba9942c0'.
> 13:46:43,217 INFO  
> org.apache.flink.runtime.checkpoint.savepoint.SavepointStore  - Loading 
> savepoint from 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-2c11ba9942c0
> 13:46:43,217 INFO  
> org.apache.flink.runtime.checkpoint.savepoint.SavepointStore  - Using 
> savepoint file in 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-2c11ba9942c0/_metadata
> 13:46:43,233 INFO  
> org.apache.flink.runtime.checkpoint.savepoint.SavepointStore  - Removing 
> savepoint: 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-2c11ba9942c0.
> 13:46:43,233 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - Disposed savepoint at 
> file:/tmp/junit6916765847938934377/junit3978153445614246595/savepoint-9d3508-2c11ba9942c0
> 13:46:43,234 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager    
>    - Trying to cancel job with ID 9d35089c4504acc95906d5300dbd4031.
> 13:46:43,235 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Job Flink Streaming Job (9d35089c4504acc95906d5300dbd4031) switched from 
> state RUNNING to CANCELLING.
> 13:46:43,235 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Source: Custom Source -> Map (1/4) (96712295b7e1614df1540a735cb62940) 
> switched from RUNNING to CANCELING.
> 13:46:43,235 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Source: Custom Source -> Map (2/4) (50c1bb7092b55ea8292c88e0180d0cc5) 
> switched from RUNNING to CANCELING.
> 13:46:43,235 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Source: Custom Source -> Map (3/4) (8e2e046358c47844fa90f6aae8ff6f0a) 
> switched from RUNNING to CANCELING.
> 13:46:43,235 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Source: Custom Source -> Map (4/4) (934e3cc3472bad094088cfd88452f8ec) 
> switched from RUNNING to CANCELING.
> 13:46:43,236 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (1/4) (cdf8ad6634e1b42a8e6c0dc6303ee999) 
> switched from RUNNING to CANCELING.
> 13:46:43,236 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (2/4) (996f597fe38c43e66ad1d21e595f04ee) 
> switched from RUNNING to CANCELING.
> 13:46:43,236 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (3/4) (0073828f2a22bb4853cc63c3e4e93732) 
> switched from RUNNING to CANCELING.
> 13:46:43,236 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (4/4) (cc54a39b19e3484b927aff66ea4bafb7) 
> switched from RUNNING to CANCELING.
> 13:46:43,238 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - 
> --------------------------------------------------------------------------------
> Test 
> testDisposeSavepointWithCustomKvState(org.apache.flink.test.classloading.ClassLoaderITCase)
>  successfully run.
> ================================================================================
> 13:46:43,238 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - 
> ================================================================================
> Test 
> testKMeansJobWithCustomClassLoader(org.apache.flink.test.classloading.ClassLoaderITCase)
>  is running.
> --------------------------------------------------------------------------------
> 13:46:43,245 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Job execution switched to status CANCELLING.
> 13:46:43,245 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Source: Custom Source -> Map(1/4) switched to 
> CANCELING 
> 13:46:43,245 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Source: Custom Source -> Map(2/4) switched to 
> CANCELING 
> 13:46:43,245 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Source: Custom Source -> Map(3/4) switched to 
> CANCELING 
> 13:46:43,245 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Source: Custom Source -> Map(4/4) switched to 
> CANCELING 
> 13:46:43,245 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Flat Map -> Sink: Unnamed(1/4) switched to 
> CANCELING 
> 13:46:43,246 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Flat Map -> Sink: Unnamed(2/4) switched to 
> CANCELING 
> 13:46:43,246 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Flat Map -> Sink: Unnamed(3/4) switched to 
> CANCELING 
> 13:46:43,246 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Flat Map -> Sink: Unnamed(4/4) switched to 
> CANCELING 
> 13:46:43,246 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Attempting to cancel task Source: Custom Source -> Map (1/4) 
> (96712295b7e1614df1540a735cb62940).
> 13:46:43,246 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Source: Custom Source -> Map (1/4) (96712295b7e1614df1540a735cb62940) 
> switched from RUNNING to CANCELING.
> 13:46:43,246 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Triggering cancellation of task code Source: Custom Source -> Map (1/4) 
> (96712295b7e1614df1540a735cb62940).
> 13:46:43,247 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Attempting to cancel task Source: Custom Source -> Map (2/4) 
> (50c1bb7092b55ea8292c88e0180d0cc5).
> 13:46:43,247 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Source: Custom Source -> Map (2/4) (50c1bb7092b55ea8292c88e0180d0cc5) 
> switched from RUNNING to CANCELING.
> 13:46:43,247 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Triggering cancellation of task code Source: Custom Source -> Map (2/4) 
> (50c1bb7092b55ea8292c88e0180d0cc5).
> 13:46:43,265 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Attempting to cancel task Source: Custom Source -> Map (3/4) 
> (8e2e046358c47844fa90f6aae8ff6f0a).
> 13:46:43,265 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Source: Custom Source -> Map (3/4) (8e2e046358c47844fa90f6aae8ff6f0a) 
> switched from RUNNING to CANCELING.
> 13:46:43,266 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Triggering cancellation of task code Source: Custom Source -> Map (3/4) 
> (8e2e046358c47844fa90f6aae8ff6f0a).
> 13:46:43,272 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Attempting to cancel task Flat Map -> Sink: Unnamed (1/4) 
> (cdf8ad6634e1b42a8e6c0dc6303ee999).
> 13:46:43,272 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Flat Map -> Sink: Unnamed (1/4) (cdf8ad6634e1b42a8e6c0dc6303ee999) 
> switched from RUNNING to CANCELING.
> 13:46:43,272 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Triggering cancellation of task code Flat Map -> Sink: Unnamed (1/4) 
> (cdf8ad6634e1b42a8e6c0dc6303ee999).
> 13:46:43,282 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Attempting to cancel task Flat Map -> Sink: Unnamed (2/4) 
> (996f597fe38c43e66ad1d21e595f04ee).
> 13:46:43,283 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Flat Map -> Sink: Unnamed (2/4) (996f597fe38c43e66ad1d21e595f04ee) 
> switched from RUNNING to CANCELING.
> 13:46:43,283 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Triggering cancellation of task code Flat Map -> Sink: Unnamed (2/4) 
> (996f597fe38c43e66ad1d21e595f04ee).
> 13:46:43,355 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Attempting to cancel task Source: Custom Source -> Map (4/4) 
> (934e3cc3472bad094088cfd88452f8ec).
> 13:46:43,355 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Source: Custom Source -> Map (4/4) (934e3cc3472bad094088cfd88452f8ec) 
> switched from RUNNING to CANCELING.
> 13:46:43,356 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Triggering cancellation of task code Source: Custom Source -> Map (4/4) 
> (934e3cc3472bad094088cfd88452f8ec).
> 13:46:43,359 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Attempting to cancel task Flat Map -> Sink: Unnamed (3/4) 
> (0073828f2a22bb4853cc63c3e4e93732).
> 13:46:43,359 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Flat Map -> Sink: Unnamed (3/4) (0073828f2a22bb4853cc63c3e4e93732) 
> switched from RUNNING to CANCELING.
> 13:46:43,359 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Triggering cancellation of task code Flat Map -> Sink: Unnamed (3/4) 
> (0073828f2a22bb4853cc63c3e4e93732).
> 13:46:43,369 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Attempting to cancel task Flat Map -> Sink: Unnamed (4/4) 
> (cc54a39b19e3484b927aff66ea4bafb7).
> 13:46:43,369 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Flat Map -> Sink: Unnamed (4/4) (cc54a39b19e3484b927aff66ea4bafb7) 
> switched from RUNNING to CANCELING.
> 13:46:43,370 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Triggering cancellation of task code Flat Map -> Sink: Unnamed (4/4) 
> (cc54a39b19e3484b927aff66ea4bafb7).
> 13:46:43,375 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Source: Custom Source -> Map (3/4) (8e2e046358c47844fa90f6aae8ff6f0a) 
> switched from CANCELING to CANCELED.
> 13:46:43,375 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Freeing task resources for Source: Custom Source -> Map (3/4) 
> (8e2e046358c47844fa90f6aae8ff6f0a).
> 13:46:43,377 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Ensuring all FileSystem streams are closed for task Source: Custom 
> Source -> Map (3/4) (8e2e046358c47844fa90f6aae8ff6f0a) [CANCELED]
> 13:46:43,378 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Flat Map -> Sink: Unnamed (1/4) (cdf8ad6634e1b42a8e6c0dc6303ee999) 
> switched from CANCELING to CANCELED.
> 13:46:43,378 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Freeing task resources for Flat Map -> Sink: Unnamed (1/4) 
> (cdf8ad6634e1b42a8e6c0dc6303ee999).
> 13:46:43,379 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Un-registering task and sending final execution state CANCELED to 
> JobManager for task Source: Custom Source -> Map 
> (8e2e046358c47844fa90f6aae8ff6f0a)
> 13:46:43,380 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Source: Custom Source -> Map (3/4) (8e2e046358c47844fa90f6aae8ff6f0a) 
> switched from CANCELING to CANCELED.
> 13:46:43,381 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Source: Custom Source -> Map(3/4) switched to 
> CANCELED 
> 13:46:43,408 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Ensuring all FileSystem streams are closed for task Flat Map -> Sink: 
> Unnamed (1/4) (cdf8ad6634e1b42a8e6c0dc6303ee999) [CANCELED]
> 13:46:43,408 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Un-registering task and sending final execution state CANCELED to 
> JobManager for task Flat Map -> Sink: Unnamed 
> (cdf8ad6634e1b42a8e6c0dc6303ee999)
> 13:46:43,410 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (1/4) (cdf8ad6634e1b42a8e6c0dc6303ee999) 
> switched from CANCELING to CANCELED.
> 13:46:43,411 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Flat Map -> Sink: Unnamed(1/4) switched to 
> CANCELED 
> 13:46:43,412 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Source: Custom Source -> Map (1/4) (96712295b7e1614df1540a735cb62940) 
> switched from CANCELING to CANCELED.
> 13:46:43,412 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Freeing task resources for Source: Custom Source -> Map (1/4) 
> (96712295b7e1614df1540a735cb62940).
> 13:46:43,412 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Ensuring all FileSystem streams are closed for task Source: Custom 
> Source -> Map (1/4) (96712295b7e1614df1540a735cb62940) [CANCELED]
> 13:46:43,412 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Un-registering task and sending final execution state CANCELED to 
> JobManager for task Source: Custom Source -> Map 
> (96712295b7e1614df1540a735cb62940)
> 13:46:43,412 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Source: Custom Source -> Map (4/4) (934e3cc3472bad094088cfd88452f8ec) 
> switched from CANCELING to CANCELED.
> 13:46:43,412 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Freeing task resources for Source: Custom Source -> Map (4/4) 
> (934e3cc3472bad094088cfd88452f8ec).
> 13:46:43,413 WARN  
> org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Error 
> while emitting latency marker.
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:468)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:884)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:150)
>       at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:256)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:141)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:884)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:739)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:722)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:465)
>       ... 10 more
> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>       at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:205)
>       at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:193)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107)
>       at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:102)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:138)
>       ... 14 more
> 13:46:43,414 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Source: Custom Source -> Map (1/4) (96712295b7e1614df1540a735cb62940) 
> switched from CANCELING to CANCELED.
> 13:46:43,414 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Ensuring all FileSystem streams are closed for task Source: Custom 
> Source -> Map (4/4) (934e3cc3472bad094088cfd88452f8ec) [CANCELED]
> 13:46:43,415 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Un-registering task and sending final execution state CANCELED to 
> JobManager for task Source: Custom Source -> Map 
> (934e3cc3472bad094088cfd88452f8ec)
> 13:46:43,421 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Flat Map -> Sink: Unnamed (3/4) (0073828f2a22bb4853cc63c3e4e93732) 
> switched from CANCELING to CANCELED.
> 13:46:43,421 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Freeing task resources for Flat Map -> Sink: Unnamed (3/4) 
> (0073828f2a22bb4853cc63c3e4e93732).
> 13:46:43,421 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Ensuring all FileSystem streams are closed for task Flat Map -> Sink: 
> Unnamed (3/4) (0073828f2a22bb4853cc63c3e4e93732) [CANCELED]
> 13:46:43,422 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Flat Map -> Sink: Unnamed (4/4) (cc54a39b19e3484b927aff66ea4bafb7) 
> switched from CANCELING to CANCELED.
> 13:46:43,422 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Freeing task resources for Flat Map -> Sink: Unnamed (4/4) 
> (cc54a39b19e3484b927aff66ea4bafb7).
> 13:46:43,422 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Ensuring all FileSystem streams are closed for task Flat Map -> Sink: 
> Unnamed (4/4) (cc54a39b19e3484b927aff66ea4bafb7) [CANCELED]
> 13:46:43,422 INFO  org.apache.flink.api.java.ExecutionEnvironment             
>    - The job has 0 registered types and 0 default Kryo serializers
> 13:46:43,423 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Un-registering task and sending final execution state CANCELED to 
> JobManager for task Flat Map -> Sink: Unnamed 
> (0073828f2a22bb4853cc63c3e4e93732)
> 13:46:43,423 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Un-registering task and sending final execution state CANCELED to 
> JobManager for task Flat Map -> Sink: Unnamed 
> (cc54a39b19e3484b927aff66ea4bafb7)
> 13:46:43,428 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Flat Map -> Sink: Unnamed (2/4) (996f597fe38c43e66ad1d21e595f04ee) 
> switched from CANCELING to CANCELED.
> 13:46:43,428 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Freeing task resources for Flat Map -> Sink: Unnamed (2/4) 
> (996f597fe38c43e66ad1d21e595f04ee).
> 13:46:43,428 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Source: Custom Source -> Map(1/4) switched to 
> CANCELED 
> 13:46:43,431 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Source: Custom Source -> Map (4/4) (934e3cc3472bad094088cfd88452f8ec) 
> switched from CANCELING to CANCELED.
> 13:46:43,436 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (3/4) (0073828f2a22bb4853cc63c3e4e93732) 
> switched from CANCELING to CANCELED.
> 13:46:43,436 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (4/4) (cc54a39b19e3484b927aff66ea4bafb7) 
> switched from CANCELING to CANCELED.
> 13:46:43,439 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Ensuring all FileSystem streams are closed for task Flat Map -> Sink: 
> Unnamed (2/4) (996f597fe38c43e66ad1d21e595f04ee) [CANCELED]
> 13:46:43,440 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Un-registering task and sending final execution state CANCELED to 
> JobManager for task Flat Map -> Sink: Unnamed 
> (996f597fe38c43e66ad1d21e595f04ee)
> 13:46:43,441 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Source: Custom Source -> Map(4/4) switched to 
> CANCELED 
> 13:46:43,441 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Flat Map -> Sink: Unnamed(3/4) switched to 
> CANCELED 
> 13:46:43,442 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Flat Map -> Sink: Unnamed(4/4) switched to 
> CANCELED 
> 13:46:43,444 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Flat Map -> Sink: Unnamed (2/4) (996f597fe38c43e66ad1d21e595f04ee) 
> switched from CANCELING to CANCELED.
> 13:46:43,445 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Flat Map -> Sink: Unnamed(2/4) switched to 
> CANCELED 
> 13:46:43,448 WARN  org.apache.flink.optimizer.Optimizer                       
>    - The parallelism of nested dataflows (such as step functions in 
> iterations) is currently fixed to the parallelism of the surrounding operator 
> (the iteration).
> 13:46:43,448 WARN  org.apache.flink.optimizer.Optimizer                       
>    - The parallelism of nested dataflows (such as step functions in 
> iterations) is currently fixed to the parallelism of the surrounding operator 
> (the iteration).
> 13:46:43,448 WARN  org.apache.flink.optimizer.Optimizer                       
>    - The parallelism of nested dataflows (such as step functions in 
> iterations) is currently fixed to the parallelism of the surrounding operator 
> (the iteration).
> 13:46:43,511 INFO  org.apache.flink.runtime.client.JobClient                  
>    - Starting JobClient actor system
> 13:46:43,559 INFO  akka.event.slf4j.Slf4jLogger                               
>    - Slf4jLogger started
> 13:46:43,570 INFO  Remoting                                                   
>    - Starting remoting
> 13:46:43,640 INFO  org.apache.flink.runtime.client.JobClient                  
>    - Started JobClient actor system at 127.0.0.1:36213
> 13:46:43,641 INFO  Remoting                                                   
>    - Remoting started; listening on addresses 
> :[akka.tcp://flink@localhost:36213]
> 13:46:43,642 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Received SubmitJobAndWait(JobGraph(jobId: 
> f5bc2c5b854c77e681fb101b22b901f3)) but there is no connection to a JobManager 
> yet.
> 13:46:43,650 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Received job Flink Java Job at Mon Aug 21 13:46:43 UTC 2017 
> (f5bc2c5b854c77e681fb101b22b901f3).
> 13:46:43,650 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Disconnect from JobManager null.
> 13:46:43,684 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Connect to JobManager 
> Actor[akka.tcp://flink@localhost:36694/user/jobmanager#-1434086720].
> 13:46:43,684 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Connected to JobManager at 
> Actor[akka.tcp://flink@localhost:36694/user/jobmanager#-1434086720] with 
> leader session id 749f16cd-6da4-4197-8e7e-8f6c13db4903.
> 13:46:43,684 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Sending message to JobManager 
> akka.tcp://flink@localhost:36694/user/jobmanager to submit job Flink Java Job 
> at Mon Aug 21 13:46:43 UTC 2017 (f5bc2c5b854c77e681fb101b22b901f3) and wait 
> for progress
> 13:46:43,684 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Upload jar files to job manager 
> akka.tcp://flink@localhost:36694/user/jobmanager.
> 13:46:43,692 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Submit job to the job manager 
> akka.tcp://flink@localhost:36694/user/jobmanager.
> 13:46:43,697 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager    
>    - Submitting job f5bc2c5b854c77e681fb101b22b901f3 (Flink Java Job at Mon 
> Aug 21 13:46:43 UTC 2017).
> 13:46:43,698 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager    
>    - Using restart strategy NoRestartStrategy for 
> f5bc2c5b854c77e681fb101b22b901f3.
> 13:46:43,698 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Job recovers via failover strategy: full graph restart
> 13:46:43,700 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager    
>    - Running initialization on master for job Flink Java Job at Mon Aug 21 
> 13:46:43 UTC 2017 (f5bc2c5b854c77e681fb101b22b901f3).
> 13:46:43,703 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager    
>    - Successfully ran initialization on master in 3 ms.
> 13:46:43,707 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager    
>    - Scheduling job f5bc2c5b854c77e681fb101b22b901f3 (Flink Java Job at Mon 
> Aug 21 13:46:43 UTC 2017).
> 13:46:43,707 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Job Flink Java Job at Mon Aug 21 13:46:43 UTC 2017 
> (f5bc2c5b854c77e681fb101b22b901f3) switched from state CREATED to RUNNING.
> 13:46:43,707 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - DataSource (at main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1) 
> (645c08cdde28c65ba17b32c62984c3e6) switched from CREATED to SCHEDULED.
> 13:46:43,707 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - DataSource (at main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1) 
> (645c08cdde28c65ba17b32c62984c3e6) switched from SCHEDULED to DEPLOYING.
> 13:46:43,707 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Deploying DataSource (at main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1) (attempt #0) to 
> localhost
> 13:46:43,708 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - CHAIN DataSource (at main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70)) (1/1) (ee3fbb6a9a391e1c38b054a0ce7c3811) 
> switched from CREATED to SCHEDULED.
> 13:46:43,708 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - CHAIN DataSource (at main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70)) (1/1) (ee3fbb6a9a391e1c38b054a0ce7c3811) 
> switched from SCHEDULED to DEPLOYING.
> 13:46:43,708 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Deploying CHAIN DataSource (at main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70)) (1/1) (attempt #0) to localhost
> 13:46:43,713 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Job f5bc2c5b854c77e681fb101b22b901f3 was successfully submitted to the 
> JobManager akka://flink/deadLetters.
> 13:46:43,713 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Job execution switched to status RUNNING.
> 13:46:43,713 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        DataSource (at main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to 
> SCHEDULED 
> 13:46:43,713 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        DataSource (at main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to 
> DEPLOYING 
> 13:46:43,713 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        CHAIN DataSource (at 
> main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70))(1/1) switched to SCHEDULED 
> 13:46:43,713 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        CHAIN DataSource (at 
> main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70))(1/1) switched to DEPLOYING 
> 13:46:43,713 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Received task DataSource (at main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1)
> 13:46:43,721 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Received task CHAIN DataSource (at main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70)) (1/1)
> 13:46:43,722 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - DataSource (at main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1) 
> (645c08cdde28c65ba17b32c62984c3e6) switched from CREATED to DEPLOYING.
> 13:46:43,722 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Creating FileSystem stream leak safety net for task DataSource (at 
> main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1) 
> (645c08cdde28c65ba17b32c62984c3e6) [DEPLOYING]
> 13:46:43,722 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Loading JAR files for task DataSource (at main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1) 
> (645c08cdde28c65ba17b32c62984c3e6) [DEPLOYING].
> 13:46:43,722 INFO  org.apache.flink.runtime.blob.BlobClient                   
>    - Downloading 
> f5bc2c5b854c77e681fb101b22b901f3/615339cdfe58f938dc338387ff3283e2681033ef-275e3647c07311545433ce37125bb403
>  from localhost/127.0.0.1:38845
> 13:46:43,726 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - CHAIN DataSource (at main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70)) (1/1) (ee3fbb6a9a391e1c38b054a0ce7c3811) 
> switched from CREATED to DEPLOYING.
> 13:46:43,726 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Creating FileSystem stream leak safety net for task CHAIN DataSource (at 
> main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70)) (1/1) (ee3fbb6a9a391e1c38b054a0ce7c3811) 
> [DEPLOYING]
> 13:46:43,726 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Loading JAR files for task CHAIN DataSource (at 
> main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70)) (1/1) (ee3fbb6a9a391e1c38b054a0ce7c3811) 
> [DEPLOYING].
> 13:46:43,738 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Registering task at network: DataSource (at main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1) 
> (645c08cdde28c65ba17b32c62984c3e6) [DEPLOYING].
> 13:46:43,739 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - DataSource (at main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1) 
> (645c08cdde28c65ba17b32c62984c3e6) switched from DEPLOYING to RUNNING.
> 13:46:43,740 WARN  org.apache.flink.metrics.MetricGroup                       
>    - The operator name DataSource (at main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80 
> characters length limit and was truncated.
> 13:46:43,740 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Registering task at network: CHAIN DataSource (at 
> main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70)) (1/1) (ee3fbb6a9a391e1c38b054a0ce7c3811) 
> [DEPLOYING].
> 13:46:43,742 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - CHAIN DataSource (at main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70)) (1/1) (ee3fbb6a9a391e1c38b054a0ce7c3811) 
> switched from DEPLOYING to RUNNING.
> 13:46:43,742 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - DataSource (at main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1) 
> (645c08cdde28c65ba17b32c62984c3e6) switched from DEPLOYING to RUNNING.
> 13:46:43,743 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        DataSource (at main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to 
> RUNNING 
> 13:46:43,743 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - CHAIN DataSource (at main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70)) (1/1) (ee3fbb6a9a391e1c38b054a0ce7c3811) 
> switched from DEPLOYING to RUNNING.
> 13:46:43,747 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        CHAIN DataSource (at 
> main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70))(1/1) switched to RUNNING 
> 13:46:43,753 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - DataSource (at main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1) 
> (645c08cdde28c65ba17b32c62984c3e6) switched from RUNNING to FINISHED.
> 13:46:43,753 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Freeing task resources for DataSource (at main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1) 
> (645c08cdde28c65ba17b32c62984c3e6).
> 13:46:43,753 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Ensuring all FileSystem streams are closed for task DataSource (at 
> main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1) 
> (645c08cdde28c65ba17b32c62984c3e6) [FINISHED]
> 13:46:43,755 WARN  org.apache.flink.metrics.MetricGroup                       
>    - The operator name DataSource (at main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80 
> characters length limit and was truncated.
> 13:46:43,757 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Un-registering task and sending final execution state FINISHED to 
> JobManager for task DataSource (at main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) 
> (645c08cdde28c65ba17b32c62984c3e6)
> 13:46:43,758 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Map (Map at main(KMeansForTest.java:67)) (1/4) 
> (9043b6f6fc62294c417c8ba83949fe0e) switched from CREATED to SCHEDULED.
> 13:46:43,758 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Map (Map at main(KMeansForTest.java:67)) (1/4) 
> (9043b6f6fc62294c417c8ba83949fe0e) switched from SCHEDULED to DEPLOYING.
> 13:46:43,758 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Deploying Map (Map at main(KMeansForTest.java:67)) (1/4) (attempt #0) to 
> localhost
> 13:46:43,758 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Map (Map at main(KMeansForTest.java:67)) (2/4) 
> (994c4d59f9922e25fed490ebae531e8b) switched from CREATED to SCHEDULED.
> 13:46:43,758 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Map (Map at main(KMeansForTest.java:67)) (2/4) 
> (994c4d59f9922e25fed490ebae531e8b) switched from SCHEDULED to DEPLOYING.
> 13:46:43,758 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Deploying Map (Map at main(KMeansForTest.java:67)) (2/4) (attempt #0) to 
> localhost
> 13:46:43,758 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Map (Map at main(KMeansForTest.java:67)) (3/4) 
> (f4cc4811c33f54e46247f136260c7ddb) switched from CREATED to SCHEDULED.
> 13:46:43,758 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Map (Map at main(KMeansForTest.java:67)) (3/4) 
> (f4cc4811c33f54e46247f136260c7ddb) switched from SCHEDULED to DEPLOYING.
> 13:46:43,759 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Deploying Map (Map at main(KMeansForTest.java:67)) (3/4) (attempt #0) to 
> localhost
> 13:46:43,759 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Map (Map at main(KMeansForTest.java:67)) (4/4) 
> (6d3cc6b1e45864496ddc4f7f43152c74) switched from CREATED to SCHEDULED.
> 13:46:43,760 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - DataSource (at main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1) 
> (645c08cdde28c65ba17b32c62984c3e6) switched from RUNNING to FINISHED.
> 13:46:43,763 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Received task Map (Map at main(KMeansForTest.java:67)) (3/4)
> 13:46:43,763 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Map (Map at main(KMeansForTest.java:67))(1/4) 
> switched to SCHEDULED 
> 13:46:43,763 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Map (Map at main(KMeansForTest.java:67))(1/4) 
> switched to DEPLOYING 
> 13:46:43,763 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Map (Map at main(KMeansForTest.java:67))(2/4) 
> switched to SCHEDULED 
> 13:46:43,763 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Map (Map at main(KMeansForTest.java:67))(2/4) 
> switched to DEPLOYING 
> 13:46:43,763 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Map (Map at main(KMeansForTest.java:67))(3/4) 
> switched to SCHEDULED 
> 13:46:43,763 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Map (Map at main(KMeansForTest.java:67))(3/4) 
> switched to DEPLOYING 
> 13:46:43,763 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Map (Map at main(KMeansForTest.java:67))(4/4) 
> switched to SCHEDULED 
> 13:46:43,765 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        DataSource (at main(KMeansForTest.java:66) 
> (org.apache.flink.api.java.io.CollectionInputFormat))(1/1) switched to 
> FINISHED 
> 13:46:43,765 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Map (Map at main(KMeansForTest.java:67)) (4/4) 
> (6d3cc6b1e45864496ddc4f7f43152c74) switched from SCHEDULED to FAILED.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Not enough free slots available to run the job. You can decrease the operator 
> parallelism or increase the number of slots per TaskManager in the 
> configuration. Task to schedule: < Attempt #0 (Map (Map at 
> main(KMeansForTest.java:67)) (4/4)) @ (unassigned) - [SCHEDULED] > with 
> groupID < 704e8c44f1c3edc91e03431408eb561d > in sharing group < 
> SlotSharingGroup [727c589bfbe7c65aa4ffc75585a1e7e7, 
> f82d7994fbfdd0aecab2c7f54e58f0c1, 62039db00aa28f9de4fa3df3b89fbc7d, 
> 704e8c44f1c3edc91e03431408eb561d, 208a859a78f987562b4e8dcad6e90582, 
> 9b9f002f990306532d6f153b38835b6f, 30f3d92eacc3068d3545693fe084a6b8, 
> 74da3f65164120b4781de360723e60c0] >. Resources available to scheduler: Number 
> of instances=2, total number of slots=4, available slots=0
>       at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:261)
>       at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:138)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.allocateSlotForExecution(Execution.java:362)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:304)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:596)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleOrUpdateConsumers$4(Execution.java:567)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 13:46:43,766 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Job Flink Java Job at Mon Aug 21 13:46:43 UTC 2017 
> (f5bc2c5b854c77e681fb101b22b901f3) switched from state RUNNING to FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Not enough free slots available to run the job. You can decrease the operator 
> parallelism or increase the number of slots per TaskManager in the 
> configuration. Task to schedule: < Attempt #0 (Map (Map at 
> main(KMeansForTest.java:67)) (4/4)) @ (unassigned) - [SCHEDULED] > with 
> groupID < 704e8c44f1c3edc91e03431408eb561d > in sharing group < 
> SlotSharingGroup [727c589bfbe7c65aa4ffc75585a1e7e7, 
> f82d7994fbfdd0aecab2c7f54e58f0c1, 62039db00aa28f9de4fa3df3b89fbc7d, 
> 704e8c44f1c3edc91e03431408eb561d, 208a859a78f987562b4e8dcad6e90582, 
> 9b9f002f990306532d6f153b38835b6f, 30f3d92eacc3068d3545693fe084a6b8, 
> 74da3f65164120b4781de360723e60c0] >. Resources available to scheduler: Number 
> of instances=2, total number of slots=4, available slots=0
>       at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:261)
>       at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:138)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.allocateSlotForExecution(Execution.java:362)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:304)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:596)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleOrUpdateConsumers$4(Execution.java:567)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 13:46:43,768 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Map (Map at main(KMeansForTest.java:67))(4/4) 
> switched to FAILED 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Not enough free slots available to run the job. You can decrease the operator 
> parallelism or increase the number of slots per TaskManager in the 
> configuration. Task to schedule: < Attempt #0 (Map (Map at 
> main(KMeansForTest.java:67)) (4/4)) @ (unassigned) - [SCHEDULED] > with 
> groupID < 704e8c44f1c3edc91e03431408eb561d > in sharing group < 
> SlotSharingGroup [727c589bfbe7c65aa4ffc75585a1e7e7, 
> f82d7994fbfdd0aecab2c7f54e58f0c1, 62039db00aa28f9de4fa3df3b89fbc7d, 
> 704e8c44f1c3edc91e03431408eb561d, 208a859a78f987562b4e8dcad6e90582, 
> 9b9f002f990306532d6f153b38835b6f, 30f3d92eacc3068d3545693fe084a6b8, 
> 74da3f65164120b4781de360723e60c0] >. Resources available to scheduler: Number 
> of instances=2, total number of slots=4, available slots=0
>       at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:261)
>       at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:138)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.allocateSlotForExecution(Execution.java:362)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:304)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:596)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleOrUpdateConsumers$4(Execution.java:567)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 13:46:43,768 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - CHAIN DataSource (at main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70)) (1/1) (ee3fbb6a9a391e1c38b054a0ce7c3811) 
> switched from RUNNING to CANCELING.
> 13:46:43,768 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Map (Map at main(KMeansForTest.java:67)) (1/4) 
> (9043b6f6fc62294c417c8ba83949fe0e) switched from DEPLOYING to CANCELING.
> 13:46:43,769 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Map (Map at main(KMeansForTest.java:67)) (2/4) 
> (994c4d59f9922e25fed490ebae531e8b) switched from DEPLOYING to CANCELING.
> 13:46:43,770 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Map (Map at main(KMeansForTest.java:67)) (3/4) 
> (f4cc4811c33f54e46247f136260c7ddb) switched from DEPLOYING to CANCELING.
> 13:46:43,770 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - PartialSolution (Bulk Iteration) (1/4) 
> (3dd9e34eb5884b9f11533ffd7926690c) switched from CREATED to CANCELED.
> 13:46:43,770 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - PartialSolution (Bulk Iteration) (2/4) 
> (62b7104ee7ab6783e5d4454cc76f2dd3) switched from CREATED to CANCELED.
> 13:46:43,770 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - PartialSolution (Bulk Iteration) (3/4) 
> (90413c0e8f06c38bf43cc5921e15368b) switched from CREATED to CANCELED.
> 13:46:43,770 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - PartialSolution (Bulk Iteration) (4/4) 
> (5221a62bfc8d83916053fa323598b135) switched from CREATED to CANCELED.
> 13:46:43,770 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - CHAIN Map (Map at main(KMeansForTest.java:77)) -> Map (Map at 
> main(KMeansForTest.java:80)) -> Combine (Reduce at 
> main(KMeansForTest.java:83)) (1/4) (d4c338e63a4ac75965318187c1ac18d3) 
> switched from CREATED to CANCELED.
> 13:46:43,770 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - CHAIN Map (Map at main(KMeansForTest.java:77)) -> Map (Map at 
> main(KMeansForTest.java:80)) -> Combine (Reduce at 
> main(KMeansForTest.java:83)) (2/4) (23f4af83429773afafe63fd31f554e13) 
> switched from CREATED to CANCELED.
> 13:46:43,771 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - CHAIN Map (Map at main(KMeansForTest.java:77)) -> Map (Map at 
> main(KMeansForTest.java:80)) -> Combine (Reduce at 
> main(KMeansForTest.java:83)) (3/4) (1219996f868398004c7e5149d230e7f6) 
> switched from CREATED to CANCELED.
> 13:46:43,771 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - CHAIN Map (Map at main(KMeansForTest.java:77)) -> Map (Map at 
> main(KMeansForTest.java:80)) -> Combine (Reduce at 
> main(KMeansForTest.java:83)) (4/4) (6554aa2f945b983d4f54665838674fa4) 
> switched from CREATED to CANCELED.
> 13:46:43,771 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - CHAIN Reduce (Reduce at main(KMeansForTest.java:83)) -> Map (Map at 
> main(KMeansForTest.java:86)) (1/4) (99858022a1ec36ecc2452d689e1e4b83) 
> switched from CREATED to CANCELED.
> 13:46:43,771 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - CHAIN Reduce (Reduce at main(KMeansForTest.java:83)) -> Map (Map at 
> main(KMeansForTest.java:86)) (2/4) (a9b8a7d86ac4b95c6474961c6edf60e5) 
> switched from CREATED to CANCELED.
> 13:46:43,771 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - CHAIN Reduce (Reduce at main(KMeansForTest.java:83)) -> Map (Map at 
> main(KMeansForTest.java:86)) (3/4) (690ea9bb98a29f83d8ae4089ebcd15a1) 
> switched from CREATED to CANCELED.
> 13:46:43,771 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - CHAIN Reduce (Reduce at main(KMeansForTest.java:83)) -> Map (Map at 
> main(KMeansForTest.java:86)) (4/4) (a638a991b92ebdd5989700e6dff49aa9) 
> switched from CREATED to CANCELED.
> 13:46:43,771 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - DataSink (collect()) (1/4) (7e3f3acab4d91d0b5c053df7b0ba3b6e) switched 
> from CREATED to CANCELED.
> 13:46:43,771 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - DataSink (collect()) (2/4) (2ef1a619dd2c9225e8dc42c25392df82) switched 
> from CREATED to CANCELED.
> 13:46:43,771 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - DataSink (collect()) (3/4) (1c8b7447e27552b2fade7782faa1827e) switched 
> from CREATED to CANCELED.
> 13:46:43,771 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - DataSink (collect()) (4/4) (08a752440c86094fe1b5b5e491d9c625) switched 
> from CREATED to CANCELED.
> 13:46:43,771 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Sync(Bulk Iteration) (1/1) (5a73a27d051b2e6cbbb02c70bcb702c9) switched 
> from CREATED to CANCELED.
> 13:46:43,773 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Received task Map (Map at main(KMeansForTest.java:67)) (1/4)
> 13:46:43,778 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Received task Map (Map at main(KMeansForTest.java:67)) (2/4)
> 13:46:43,778 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Map (Map at main(KMeansForTest.java:67)) (1/4) 
> (9043b6f6fc62294c417c8ba83949fe0e) switched from CREATED to DEPLOYING.
> 13:46:43,778 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Creating FileSystem stream leak safety net for task Map (Map at 
> main(KMeansForTest.java:67)) (1/4) (9043b6f6fc62294c417c8ba83949fe0e) 
> [DEPLOYING]
> 13:46:43,778 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Loading JAR files for task Map (Map at main(KMeansForTest.java:67)) 
> (1/4) (9043b6f6fc62294c417c8ba83949fe0e) [DEPLOYING].
> 13:46:43,778 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Registering task at network: Map (Map at main(KMeansForTest.java:67)) 
> (1/4) (9043b6f6fc62294c417c8ba83949fe0e) [DEPLOYING].
> 13:46:43,778 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Discarding the results produced by task execution 
> 645c08cdde28c65ba17b32c62984c3e6
> 13:46:43,779 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Attempting to cancel task CHAIN DataSource (at 
> main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70)) (1/1) (ee3fbb6a9a391e1c38b054a0ce7c3811).
> 13:46:43,779 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - CHAIN DataSource (at main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70)) (1/1) (ee3fbb6a9a391e1c38b054a0ce7c3811) 
> switched from RUNNING to CANCELING.
> 13:46:43,779 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Triggering cancellation of task code CHAIN DataSource (at 
> main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70)) (1/1) (ee3fbb6a9a391e1c38b054a0ce7c3811).
> 13:46:43,779 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Map (Map at main(KMeansForTest.java:67)) (2/4) 
> (994c4d59f9922e25fed490ebae531e8b) switched from CREATED to DEPLOYING.
> 13:46:43,779 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Creating FileSystem stream leak safety net for task Map (Map at 
> main(KMeansForTest.java:67)) (2/4) (994c4d59f9922e25fed490ebae531e8b) 
> [DEPLOYING]
> 13:46:43,779 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Loading JAR files for task Map (Map at main(KMeansForTest.java:67)) 
> (2/4) (994c4d59f9922e25fed490ebae531e8b) [DEPLOYING].
> 13:46:43,780 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Registering task at network: Map (Map at main(KMeansForTest.java:67)) 
> (2/4) (994c4d59f9922e25fed490ebae531e8b) [DEPLOYING].
> 13:46:43,780 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Attempting to cancel task Map (Map at main(KMeansForTest.java:67)) (1/4) 
> (9043b6f6fc62294c417c8ba83949fe0e).
> 13:46:43,780 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Map (Map at main(KMeansForTest.java:67)) (1/4) 
> (9043b6f6fc62294c417c8ba83949fe0e) switched from DEPLOYING to CANCELING.
> 13:46:43,780 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Attempting to cancel task Map (Map at main(KMeansForTest.java:67)) (2/4) 
> (994c4d59f9922e25fed490ebae531e8b).
> 13:46:43,780 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Map (Map at main(KMeansForTest.java:67)) (2/4) 
> (994c4d59f9922e25fed490ebae531e8b) switched from DEPLOYING to CANCELING.
> 13:46:43,781 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - CHAIN DataSource (at main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70)) (1/1) (ee3fbb6a9a391e1c38b054a0ce7c3811) 
> switched from CANCELING to CANCELED.
> 13:46:43,781 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Freeing task resources for CHAIN DataSource (at 
> main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70)) (1/1) (ee3fbb6a9a391e1c38b054a0ce7c3811).
> 13:46:43,781 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Ensuring all FileSystem streams are closed for task CHAIN DataSource (at 
> main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70)) (1/1) (ee3fbb6a9a391e1c38b054a0ce7c3811) 
> [CANCELED]
> 13:46:43,781 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Un-registering task and sending final execution state CANCELED to 
> JobManager for task CHAIN DataSource (at main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70)) (ee3fbb6a9a391e1c38b054a0ce7c3811)
> 13:46:43,783 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Attempting to cancel task Map (Map at main(KMeansForTest.java:67)) (3/4) 
> (f4cc4811c33f54e46247f136260c7ddb).
> 13:46:43,783 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Map (Map at main(KMeansForTest.java:67)) (3/4) 
> (f4cc4811c33f54e46247f136260c7ddb) switched from CREATED to CANCELING.
> 13:46:43,784 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Job execution switched to status FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Not enough free slots available to run the job. You can decrease the operator 
> parallelism or increase the number of slots per TaskManager in the 
> configuration. Task to schedule: < Attempt #0 (Map (Map at 
> main(KMeansForTest.java:67)) (4/4)) @ (unassigned) - [SCHEDULED] > with 
> groupID < 704e8c44f1c3edc91e03431408eb561d > in sharing group < 
> SlotSharingGroup [727c589bfbe7c65aa4ffc75585a1e7e7, 
> f82d7994fbfdd0aecab2c7f54e58f0c1, 62039db00aa28f9de4fa3df3b89fbc7d, 
> 704e8c44f1c3edc91e03431408eb561d, 208a859a78f987562b4e8dcad6e90582, 
> 9b9f002f990306532d6f153b38835b6f, 30f3d92eacc3068d3545693fe084a6b8, 
> 74da3f65164120b4781de360723e60c0] >. Resources available to scheduler: Number 
> of instances=2, total number of slots=4, available slots=0
>       at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:261)
>       at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:138)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.allocateSlotForExecution(Execution.java:362)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:304)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:596)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleOrUpdateConsumers$4(Execution.java:567)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 13:46:43,784 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        CHAIN DataSource (at 
> main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70))(1/1) switched to CANCELING 
> 13:46:43,784 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Map (Map at main(KMeansForTest.java:67))(1/4) 
> switched to CANCELING 
> 13:46:43,784 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Map (Map at main(KMeansForTest.java:67))(2/4) 
> switched to CANCELING 
> 13:46:43,784 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Map (Map at main(KMeansForTest.java:67))(3/4) 
> switched to CANCELING 
> 13:46:43,784 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        PartialSolution (Bulk Iteration)(1/4) 
> switched to CANCELED 
> 13:46:43,784 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        PartialSolution (Bulk Iteration)(2/4) 
> switched to CANCELED 
> 13:46:43,784 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        PartialSolution (Bulk Iteration)(3/4) 
> switched to CANCELED 
> 13:46:43,784 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        PartialSolution (Bulk Iteration)(4/4) 
> switched to CANCELED 
> 13:46:43,784 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        CHAIN Map (Map at 
> main(KMeansForTest.java:77)) -> Map (Map at main(KMeansForTest.java:80)) -> 
> Combine (Reduce at main(KMeansForTest.java:83))(1/4) switched to CANCELED 
> 13:46:43,785 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        CHAIN Map (Map at 
> main(KMeansForTest.java:77)) -> Map (Map at main(KMeansForTest.java:80)) -> 
> Combine (Reduce at main(KMeansForTest.java:83))(2/4) switched to CANCELED 
> 13:46:43,785 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        CHAIN Map (Map at 
> main(KMeansForTest.java:77)) -> Map (Map at main(KMeansForTest.java:80)) -> 
> Combine (Reduce at main(KMeansForTest.java:83))(3/4) switched to CANCELED 
> 13:46:43,785 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        CHAIN Map (Map at 
> main(KMeansForTest.java:77)) -> Map (Map at main(KMeansForTest.java:80)) -> 
> Combine (Reduce at main(KMeansForTest.java:83))(4/4) switched to CANCELED 
> 13:46:43,785 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        CHAIN Reduce (Reduce at 
> main(KMeansForTest.java:83)) -> Map (Map at main(KMeansForTest.java:86))(1/4) 
> switched to CANCELED 
> 13:46:43,785 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        CHAIN Reduce (Reduce at 
> main(KMeansForTest.java:83)) -> Map (Map at main(KMeansForTest.java:86))(2/4) 
> switched to CANCELED 
> 13:46:43,785 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        CHAIN Reduce (Reduce at 
> main(KMeansForTest.java:83)) -> Map (Map at main(KMeansForTest.java:86))(3/4) 
> switched to CANCELED 
> 13:46:43,791 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Map (Map at main(KMeansForTest.java:67)) (1/4) 
> (9043b6f6fc62294c417c8ba83949fe0e) switched from CANCELING to CANCELED.
> 13:46:43,791 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Freeing task resources for Map (Map at main(KMeansForTest.java:67)) 
> (1/4) (9043b6f6fc62294c417c8ba83949fe0e).
> 13:46:43,791 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Ensuring all FileSystem streams are closed for task Map (Map at 
> main(KMeansForTest.java:67)) (1/4) (9043b6f6fc62294c417c8ba83949fe0e) 
> [CANCELED]
> 13:46:43,792 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Map (Map at main(KMeansForTest.java:67)) (2/4) 
> (994c4d59f9922e25fed490ebae531e8b) switched from CANCELING to CANCELED.
> 13:46:43,792 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Freeing task resources for Map (Map at main(KMeansForTest.java:67)) 
> (2/4) (994c4d59f9922e25fed490ebae531e8b).
> 13:46:43,792 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Map (Map at main(KMeansForTest.java:67)) (3/4) 
> (f4cc4811c33f54e46247f136260c7ddb) switched from CANCELING to CANCELED.
> 13:46:43,792 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Un-registering task and sending final execution state CANCELED to 
> JobManager for task Map (Map at main(KMeansForTest.java:67)) 
> (9043b6f6fc62294c417c8ba83949fe0e)
> 13:46:43,793 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        CHAIN Reduce (Reduce at 
> main(KMeansForTest.java:83)) -> Map (Map at main(KMeansForTest.java:86))(4/4) 
> switched to CANCELED 
> 13:46:43,795 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        DataSink (collect())(1/4) switched to 
> CANCELED 
> 13:46:43,795 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        DataSink (collect())(2/4) switched to 
> CANCELED 
> 13:46:43,795 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        DataSink (collect())(3/4) switched to 
> CANCELED 
> 13:46:43,796 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        DataSink (collect())(4/4) switched to 
> CANCELED 
> 13:46:43,796 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Sync(Bulk Iteration)(1/1) switched to 
> CANCELED 
> 13:46:43,795 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Map (Map at main(KMeansForTest.java:67)) (1/4) 
> (9043b6f6fc62294c417c8ba83949fe0e) switched from CANCELING to CANCELED.
> 13:46:43,795 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - CHAIN DataSource (at main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70)) (1/1) (ee3fbb6a9a391e1c38b054a0ce7c3811) 
> switched from CANCELING to CANCELED.
> 13:46:43,793 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Un-registering task and sending final execution state CANCELED to 
> JobManager for task Map (Map at main(KMeansForTest.java:67)) 
> (f4cc4811c33f54e46247f136260c7ddb)
> 13:46:43,798 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Map (Map at main(KMeansForTest.java:67)) (3/4) 
> (f4cc4811c33f54e46247f136260c7ddb) switched from CANCELING to CANCELED.
> 13:46:43,800 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Map (Map at main(KMeansForTest.java:67))(1/4) 
> switched to CANCELED 
> 13:46:43,800 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Ensuring all FileSystem streams are closed for task Map (Map at 
> main(KMeansForTest.java:67)) (2/4) (994c4d59f9922e25fed490ebae531e8b) 
> [CANCELED]
> 13:46:43,801 INFO  org.apache.flink.runtime.testingUtils.TestingTaskManager   
>    - Un-registering task and sending final execution state CANCELED to 
> JobManager for task Map (Map at main(KMeansForTest.java:67)) 
> (994c4d59f9922e25fed490ebae531e8b)
> 13:46:43,802 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        CHAIN DataSource (at 
> main(KMeansForTest.java:69) 
> (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
> main(KMeansForTest.java:70))(1/1) switched to CANCELED 
> 13:46:43,802 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Map (Map at main(KMeansForTest.java:67)) (2/4) 
> (994c4d59f9922e25fed490ebae531e8b) switched from CANCELING to CANCELED.
> 13:46:43,802 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Try to restart or fail the job Flink Java Job at Mon Aug 21 13:46:43 UTC 
> 2017 (f5bc2c5b854c77e681fb101b22b901f3) if no longer possible.
> 13:46:43,802 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Job Flink Java Job at Mon Aug 21 13:46:43 UTC 2017 
> (f5bc2c5b854c77e681fb101b22b901f3) switched from state FAILING to FAILED.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Not enough free slots available to run the job. You can decrease the operator 
> parallelism or increase the number of slots per TaskManager in the 
> configuration. Task to schedule: < Attempt #0 (Map (Map at 
> main(KMeansForTest.java:67)) (4/4)) @ (unassigned) - [SCHEDULED] > with 
> groupID < 704e8c44f1c3edc91e03431408eb561d > in sharing group < 
> SlotSharingGroup [727c589bfbe7c65aa4ffc75585a1e7e7, 
> f82d7994fbfdd0aecab2c7f54e58f0c1, 62039db00aa28f9de4fa3df3b89fbc7d, 
> 704e8c44f1c3edc91e03431408eb561d, 208a859a78f987562b4e8dcad6e90582, 
> 9b9f002f990306532d6f153b38835b6f, 30f3d92eacc3068d3545693fe084a6b8, 
> 74da3f65164120b4781de360723e60c0] >. Resources available to scheduler: Number 
> of instances=2, total number of slots=4, available slots=0
>       at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:261)
>       at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:138)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.allocateSlotForExecution(Execution.java:362)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:304)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:596)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleOrUpdateConsumers$4(Execution.java:567)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 13:46:43,802 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Could not restart the job Flink Java Job at Mon Aug 21 13:46:43 UTC 2017 
> (f5bc2c5b854c77e681fb101b22b901f3) because the restart strategy prevented it.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Not enough free slots available to run the job. You can decrease the operator 
> parallelism or increase the number of slots per TaskManager in the 
> configuration. Task to schedule: < Attempt #0 (Map (Map at 
> main(KMeansForTest.java:67)) (4/4)) @ (unassigned) - [SCHEDULED] > with 
> groupID < 704e8c44f1c3edc91e03431408eb561d > in sharing group < 
> SlotSharingGroup [727c589bfbe7c65aa4ffc75585a1e7e7, 
> f82d7994fbfdd0aecab2c7f54e58f0c1, 62039db00aa28f9de4fa3df3b89fbc7d, 
> 704e8c44f1c3edc91e03431408eb561d, 208a859a78f987562b4e8dcad6e90582, 
> 9b9f002f990306532d6f153b38835b6f, 30f3d92eacc3068d3545693fe084a6b8, 
> 74da3f65164120b4781de360723e60c0] >. Resources available to scheduler: Number 
> of instances=2, total number of slots=4, available slots=0
>       at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:261)
>       at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:138)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.allocateSlotForExecution(Execution.java:362)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:304)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:596)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleOrUpdateConsumers$4(Execution.java:567)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 13:46:43,802 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Map (Map at main(KMeansForTest.java:67))(3/4) 
> switched to CANCELED 
> 13:46:43,805 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Map (Map at main(KMeansForTest.java:67))(2/4) 
> switched to CANCELED 
> 13:46:43,806 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - 08/21/2017 13:46:43        Job execution switched to status FAILED.
> 13:46:43,808 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Terminate JobClientActor.
> 13:46:43,808 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>    - Disconnect from JobManager 
> Actor[akka.tcp://flink@localhost:36694/user/jobmanager#-1434086720].
> 13:46:43,809 INFO  org.apache.flink.runtime.client.JobClient                  
>    - Job execution failed
> 13:46:43,811 ERROR org.apache.flink.test.classloading.ClassLoaderITCase       
>    - 
> --------------------------------------------------------------------------------
> Test 
> testKMeansJobWithCustomClassLoader(org.apache.flink.test.classloading.ClassLoaderITCase)
>  failed with:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
>       at 
> org.apache.flink.test.classloading.ClassLoaderITCase.testKMeansJobWithCustomClassLoader(ClassLoaderITCase.java:232)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
>       at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>       at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>       at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>       at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>       at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>       at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>       at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>       at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>       at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>       at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>       at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
>       at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>       at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
>       at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:930)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Not enough free slots available to run the job. You can decrease the operator 
> parallelism or increase the number of slots per TaskManager in the 
> configuration. Task to schedule: < Attempt #0 (Map (Map at 
> main(KMeansForTest.java:67)) (4/4)) @ (unassigned) - [SCHEDULED] > with 
> groupID < 704e8c44f1c3edc91e03431408eb561d > in sharing group < 
> SlotSharingGroup [727c589bfbe7c65aa4ffc75585a1e7e7, 
> f82d7994fbfdd0aecab2c7f54e58f0c1, 62039db00aa28f9de4fa3df3b89fbc7d, 
> 704e8c44f1c3edc91e03431408eb561d, 208a859a78f987562b4e8dcad6e90582, 
> 9b9f002f990306532d6f153b38835b6f, 30f3d92eacc3068d3545693fe084a6b8, 
> 74da3f65164120b4781de360723e60c0] >. Resources available to scheduler: Number 
> of instances=2, total number of slots=4, available slots=0
>       at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:261)
>       at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:138)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.allocateSlotForExecution(Execution.java:362)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:304)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:596)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleOrUpdateConsumers$4(Execution.java:567)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> ================================================================================
> 13:46:43,812 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator      
>    - Shutting down remote daemon.
> 13:46:43,819 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator      
>    - Remote daemon shut down; proceeding with flushing remote transports.
> 13:46:43,827 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator      
>    - Remoting shut down.
> 13:46:43,843 INFO  org.apache.flink.test.classloading.ClassLoaderITCase       
>    - 
> ================================================================================
> Test 
> testStreamingCustomSplitJobWithCustomClassLoader(org.apache.flink.test.classloading.ClassLoaderITCase)
>  is running.
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to