[ 
https://issues.apache.org/jira/browse/BEAM-12291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17434057#comment-17434057
 ] 

Valentyn Tymofieiev commented on BEAM-12291:
--------------------------------------------

Saw similar error in 
org.apache.beam.runners.flink.PortableStateExecutionTest.testExecution[streaming:
 false]


{noformat}

Error Message
org.junit.runners.model.TestTimedOutException: test timed out after 120000 
milliseconds
Stacktrace
org.junit.runners.model.TestTimedOutException: test timed out after 120000 
milliseconds
        at java.lang.Thread.sleep(Native Method)
        at 
org.apache.beam.runners.flink.PortableStateExecutionTest.testExecution(PortableStateExecutionTest.java:206)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
        at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.lang.Thread.run(Thread.java:748)
Standard Output
Shutting SDK harness down.
Shutting SDK harness down.
Shutting SDK harness down.
{noformat}

and NPEs later:

{noformat}
Standard Error
[Time-limited test] INFO org.apache.beam.runners.jobsubmission.JobInvocation - 
Starting job invocation id
[pool-54-thread-1] INFO org.apache.beam.runners.flink.FlinkPipelineRunner - 
Translating pipeline to Flink program.
[pool-54-thread-1] INFO 
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Streaming 
Environment.
[pool-54-thread-1] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No 
fields were detected for class org.apache.beam.sdk.util.WindowedValue so it 
cannot be used as a POJO type and must be processed as GenericType. Please read 
the Flink documentation on "Data Types & Serialization" for details of the 
effect on performance.
[pool-54-thread-1] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No 
fields were detected for class org.apache.beam.sdk.util.WindowedValue so it 
cannot be used as a POJO type and must be processed as GenericType. Please read 
the Flink documentation on "Data Types & Serialization" for details of the 
effect on performance.
[Finalizer] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Timer 
service is shutting down.
[Finalizer] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Timer 
service is shutting down.
[Finalizer] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Timer 
service is shutting down.
[Finalizer] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Timer 
service is shutting down.
[Finalizer] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Timer 
service is shutting down.
[Finalizer] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Timer 
service is shutting down.
[pool-54-thread-1] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No 
fields were detected for class org.apache.beam.sdk.util.WindowedValue so it 
cannot be used as a POJO type and must be processed as GenericType. Please read 
the Flink documentation on "Data Types & Serialization" for details of the 
effect on performance.
[pool-54-thread-1] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils - The 
configuration option taskmanager.cpu.cores required for local execution is not 
set, setting it to the maximal possible value.
[pool-54-thread-1] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils - The 
configuration option taskmanager.memory.task.heap.size required for local 
execution is not set, setting it to the maximal possible value.
[pool-54-thread-1] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils - The 
configuration option taskmanager.memory.task.off-heap.size required for local 
execution is not set, setting it to the maximal possible value.
[pool-54-thread-1] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils - The 
configuration option taskmanager.memory.network.min required for local 
execution is not set, setting it to its default value 64 mb.
[pool-54-thread-1] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils - The 
configuration option taskmanager.memory.network.max required for local 
execution is not set, setting it to its default value 64 mb.
[pool-54-thread-1] INFO org.apache.flink.runtime.minicluster.MiniCluster - 
Starting Flink Mini Cluster
[pool-54-thread-1] INFO org.apache.flink.runtime.minicluster.MiniCluster - 
Starting Metrics Registry
[pool-54-thread-1] INFO org.apache.flink.runtime.metrics.MetricRegistryImpl - 
No metrics reporter configured, no metrics will be exposed/reported.
[pool-54-thread-1] INFO org.apache.flink.runtime.minicluster.MiniCluster - 
Starting RPC Service(s)
[pool-54-thread-1] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - 
Trying to start local actor system
[flink-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger - 
Slf4jLogger started
[pool-54-thread-1] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - 
Actor system started at akka://flink
[pool-54-thread-1] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - 
Trying to start local actor system
[flink-metrics-2] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
[pool-54-thread-1] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - 
Actor system started at akka://flink-metrics
[pool-54-thread-1] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - 
Starting RPC endpoint for 
org.apache.flink.runtime.metrics.dump.MetricQueryService at 
akka://flink-metrics/user/rpc/MetricQueryService .
[pool-54-thread-1] INFO org.apache.flink.runtime.minicluster.MiniCluster - 
Starting high-availability services
[pool-54-thread-1] INFO org.apache.flink.runtime.blob.BlobServer - Created BLOB 
server storage directory /tmp/blobStore-c9650f95-4b93-453a-91ac-a8560876b843
[pool-54-thread-1] INFO org.apache.flink.runtime.blob.BlobServer - Started BLOB 
server at 0.0.0.0:42885 - max concurrent requests: 50 - max backlog: 1000
[pool-54-thread-1] INFO org.apache.flink.runtime.blob.PermanentBlobCache - 
Created BLOB cache storage directory 
/tmp/blobStore-1a540446-b031-4087-acf0-2e21448f2bbf
[pool-54-thread-1] INFO org.apache.flink.runtime.blob.TransientBlobCache - 
Created BLOB cache storage directory 
/tmp/blobStore-c424960f-bbbd-4e33-8325-35bbddfc48ed
[pool-54-thread-1] INFO org.apache.flink.runtime.minicluster.MiniCluster - 
Starting 1 TaskManger(s)
[pool-54-thread-1] INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner 
- Starting TaskManager with ResourceID: 3f91ce1c-996e-45e4-828b-45f9a90b56b0
[pool-54-thread-1] INFO 
org.apache.flink.runtime.taskexecutor.TaskManagerServices - Temporary file 
directory '/tmp': total 969 GB, usable 905 GB (93.40% usable)
[pool-54-thread-1] INFO org.apache.flink.runtime.io.disk.FileChannelManagerImpl 
- FileChannelManager uses directory 
/tmp/flink-io-0437328d-1c68-436c-ad92-3c1ba77e849a for spill files.
[pool-54-thread-1] INFO org.apache.flink.runtime.io.disk.FileChannelManagerImpl 
- FileChannelManager uses directory 
/tmp/flink-netty-shuffle-57eb3afc-34ab-4058-ace2-ccfb5a286443 for spill files.
[pool-54-thread-1] INFO 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool - Allocated 64 MB 
for network buffer pool (number of memory segments: 2048, bytes per segment: 
32768).
[pool-54-thread-1] INFO 
org.apache.flink.runtime.io.network.NettyShuffleEnvironment - Starting the 
network environment and its components.
[pool-54-thread-1] INFO org.apache.flink.runtime.taskexecutor.KvStateService - 
Starting the kvState service and its components.
[pool-54-thread-1] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - 
Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at 
akka://flink/user/rpc/taskmanager_51 .
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService - Start job 
leader service.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.filecache.FileCache - User file cache uses directory 
/tmp/flink-dist-cache-ccdd947c-fb4f-4e2d-afbd-c02eaca4d471
[pool-54-thread-1] INFO 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Starting rest 
endpoint.
[pool-54-thread-1] INFO 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Failed to load web 
based job submission extension. Probable reason: flink-runtime-web is not in 
the classpath.
[pool-54-thread-1] WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - 
Log file environment variable 'log.file' is not set.
[pool-54-thread-1] WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - 
JobManager log files are unavailable in the web dashboard. Log file location 
not found in environment variable 'log.file' or configuration key 
'web.log.path'.
[pool-54-thread-1] INFO 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Rest endpoint 
listening at localhost:45381
[pool-54-thread-1] INFO 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService 
- Proposing leadership to contender http://localhost:45381
[pool-54-thread-1] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - 
Starting RPC endpoint for 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at 
akka://flink/user/rpc/resourcemanager_52 .
[pool-54-thread-1] INFO 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService 
- Proposing leadership to contender LeaderContender: DefaultDispatcherRunner
[mini-cluster-io-thread-1] INFO 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - 
http://localhost:45381 was granted leadership with 
leaderSessionID=cf0aafdb-2129-4115-be77-df338074799e
[mini-cluster-io-thread-1] INFO 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService 
- Received confirmation of leadership for leader http://localhost:45381 , 
session=cf0aafdb-2129-4115-be77-df338074799e
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService 
- Proposing leadership to contender LeaderContender: StandaloneResourceManager
[mini-cluster-io-thread-2] INFO 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess - 
Start SessionDispatcherLeaderProcess.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - 
ResourceManager akka://flink/user/rpc/resourcemanager_52 was granted leadership 
with fencing token 852ca74b8e2728cf18ab971200694c6f
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - Starting 
the SlotManager.
[pool-54-thread-1] INFO org.apache.flink.runtime.minicluster.MiniCluster - 
Flink Mini Cluster started successfully
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService 
- Received confirmation of leadership for leader 
akka://flink/user/rpc/resourcemanager_52 , 
session=18ab9712-0069-4c6f-852c-a74b8e2728cf
[mini-cluster-io-thread-4] INFO 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess - 
Recover all persisted job graphs.
[mini-cluster-io-thread-4] INFO 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess - 
Successfully recovered 0 persisted job graphs.
[mini-cluster-io-thread-4] INFO 
org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/rpc/dispatcher_53 .
[mini-cluster-io-thread-4] INFO 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService 
- Received confirmation of leadership for leader 
akka://flink/user/rpc/dispatcher_53 , 
session=ae782abb-cc8a-42a8-9dd3-c1b9ff4d18c6
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received JobGraph 
submission ae0765af42e11919a8fd217c9ed5aa4a 
(portablestateexecutiontest-jenkins-1019181548-71bfb76c).
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Submitting job 
ae0765af42e11919a8fd217c9ed5aa4a 
(portablestateexecutiontest-jenkins-1019181548-71bfb76c).
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for 
org.apache.flink.runtime.jobmaster.JobMaster at 
akka://flink/user/rpc/jobmanager_54 .
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.jobmaster.JobMaster - Initializing job 
portablestateexecutiontest-jenkins-1019181548-71bfb76c 
(ae0765af42e11919a8fd217c9ed5aa4a).
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.jobmaster.JobMaster - Using restart back off time 
strategy NoRestartBackoffTimeStrategy for 
portablestateexecutiontest-jenkins-1019181548-71bfb76c 
(ae0765af42e11919a8fd217c9ed5aa4a).
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.jobmaster.JobMaster - Running initialization on master 
for job portablestateexecutiontest-jenkins-1019181548-71bfb76c 
(ae0765af42e11919a8fd217c9ed5aa4a).
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.jobmaster.JobMaster - Successfully ran initialization 
on master in 0 ms.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology - Built 1 
pipelined regions in 0 ms
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.jobmaster.JobMaster - No state backend has been 
configured, using default (Memory / JobManager) MemoryStateBackend (data in 
heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 
'null', asynchronous: TRUE, maxStateSize: 5242880)
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - No checkpoint found 
during restore.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.jobmaster.JobMaster - Using failover strategy 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@6d5d00b1
 for portablestateexecutiontest-jenkins-1019181548-71bfb76c 
(ae0765af42e11919a8fd217c9ed5aa4a).
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService 
- Proposing leadership to contender akka://flink/user/rpc/jobmanager_54
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Connecting to 
ResourceManager 
akka://flink/user/rpc/resourcemanager_52(852ca74b8e2728cf18ab971200694c6f).
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Resolved ResourceManager 
address, beginning registration
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - 
Registering TaskManager with ResourceID 3f91ce1c-996e-45e4-828b-45f9a90b56b0 
(akka://flink/user/rpc/taskmanager_51) at ResourceManager
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Successful registration at 
resource manager akka://flink/user/rpc/resourcemanager_52 under registration id 
f9022dfad5f1359844542869d208a72c.
[mini-cluster-io-thread-11] INFO 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl - JobManager runner for 
job portablestateexecutiontest-jenkins-1019181548-71bfb76c 
(ae0765af42e11919a8fd217c9ed5aa4a) was granted leadership with session id 
be295e8d-5ee5-4530-8c36-2c640c069542 at akka://flink/user/rpc/jobmanager_54.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.jobmaster.JobMaster - Starting execution of job 
portablestateexecutiontest-jenkins-1019181548-71bfb76c 
(ae0765af42e11919a8fd217c9ed5aa4a) under job master id 
8c362c640c069542be295e8d5ee54530.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.jobmaster.JobMaster - Starting scheduling with 
scheduling strategy 
[org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job 
portablestateexecutiontest-jenkins-1019181548-71bfb76c 
(ae0765af42e11919a8fd217c9ed5aa4a) switched from state CREATED to RUNNING.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Impulse (1/1) 
(99f6b20fa854798aef66c96096d15a10) switched from CREATED to SCHEDULED.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Impulse (1/1) 
(921e043a5e761f4369f5fdc624e89317) switched from CREATED to SCHEDULED.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (1/2) (2009f6288f1440b2f1a50766144061a5) 
switched from CREATED to SCHEDULED.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (2/2) (9e22f9760d3e65975febf82a0f9865f7) 
switched from CREATED to SCHEDULED.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
[4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, Window.Into(), 
WithKeys} (1/2) (22a911c6d6cf466766fa43845ca663f9) switched from CREATED to 
SCHEDULED.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
[4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, Window.Into(), 
WithKeys} (2/2) (c83a81517672d8831530a88f708b4c8f) switched from CREATED to 
SCHEDULED.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - [1]ParDo(Anonymous) 
(1/2) (7b7a399e098469acc84d1a72e07f0831) switched from CREATED to SCHEDULED.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - [1]ParDo(Anonymous) 
(2/2) (e4a6148fb3ee7eb50bc24b0dbad5d6bd) switched from CREATED to SCHEDULED.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - [6]{statefulDoFn, 
PAssert$4} (1/2) (526919a5589d6979e92abbd0735b5888) switched from CREATED to 
SCHEDULED.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - [6]{statefulDoFn, 
PAssert$4} (2/2) (dd36537a90b0ca0fd66178e6392c94b5) switched from CREATED to 
SCHEDULED.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - ToKeyedWorkItem (1/2) 
(843cf995d5a76f3c98c8e299b4fb90ca) switched from CREATED to SCHEDULED.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - ToKeyedWorkItem (2/2) 
(fecef26116844a28baea28f79a3c4d7f) switched from CREATED to SCHEDULED.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
PAssert$4/GroupGlobally/GroupByKey -> [5]PAssert$4/{GroupGlobally, GetPane, 
RunChecks, VerifyAssertions} (1/2) (e31ef171e4a26955071e5db4fd643cd9) switched 
from CREATED to SCHEDULED.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
PAssert$4/GroupGlobally/GroupByKey -> [5]PAssert$4/{GroupGlobally, GetPane, 
RunChecks, VerifyAssertions} (2/2) (95cbb611a5d92ecc799c404c32628577) switched 
from CREATED to SCHEDULED.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Cannot serve slot 
request, no ResourceManager connected. Adding as pending request 
[SlotRequestId{402045388d349bcff71112e4d364974b}]
[jobmanager-future-thread-1] INFO 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService 
- Received confirmation of leadership for leader 
akka://flink/user/rpc/jobmanager_54 , 
session=be295e8d-5ee5-4530-8c36-2c640c069542
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.jobmaster.JobMaster - Connecting to ResourceManager 
akka://flink/user/rpc/resourcemanager_52(852ca74b8e2728cf18ab971200694c6f)
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.jobmaster.JobMaster - Resolved ResourceManager 
address, beginning registration
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - 
Registering job manager 
8c362c640c069542be295e8d5ee54530@akka://flink/user/rpc/jobmanager_54 for job 
ae0765af42e11919a8fd217c9ed5aa4a.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registered 
job manager 
8c362c640c069542be295e8d5ee54530@akka://flink/user/rpc/jobmanager_54 for job 
ae0765af42e11919a8fd217c9ed5aa4a.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.jobmaster.JobMaster - JobManager successfully 
registered at ResourceManager, leader id: 852ca74b8e2728cf18ab971200694c6f.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Requesting new slot 
[SlotRequestId{402045388d349bcff71112e4d364974b}] and profile 
ResourceProfile{UNKNOWN} from resource manager.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Request 
slot with profile ResourceProfile{UNKNOWN} for job 
ae0765af42e11919a8fd217c9ed5aa4a with allocation id 
9126f805ed20572a1575a160671c7550.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Receive slot request 
9126f805ed20572a1575a160671c7550 for job ae0765af42e11919a8fd217c9ed5aa4a from 
resource manager with leader id 852ca74b8e2728cf18ab971200694c6f.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Allocated slot for 
9126f805ed20572a1575a160671c7550.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService - Add job 
ae0765af42e11919a8fd217c9ed5aa4a for job leader monitoring.
[mini-cluster-io-thread-14] INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService - Try to register 
at job manager akka://flink/user/rpc/jobmanager_54 with leader id 
be295e8d-5ee5-4530-8c36-2c640c069542.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService - Resolved 
JobManager address, beginning registration
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService - Successful 
registration at job manager akka://flink/user/rpc/jobmanager_54 for job 
ae0765af42e11919a8fd217c9ed5aa4a.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Establish JobManager 
connection for job ae0765af42e11919a8fd217c9ed5aa4a.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Offer reserved slots to 
the leader of job ae0765af42e11919a8fd217c9ed5aa4a.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Requesting new slot 
[SlotRequestId{ab8544aab1dfb01d38c203c86d8e2839}] and profile 
ResourceProfile{UNKNOWN} from resource manager.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Request 
slot with profile ResourceProfile{UNKNOWN} for job 
ae0765af42e11919a8fd217c9ed5aa4a with allocation id 
9b80ba6cd31b571f18e396e825715d97.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Receive slot request 
9b80ba6cd31b571f18e396e825715d97 for job ae0765af42e11919a8fd217c9ed5aa4a from 
resource manager with leader id 852ca74b8e2728cf18ab971200694c6f.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Allocated slot for 
9b80ba6cd31b571f18e396e825715d97.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Offer reserved slots to 
the leader of job ae0765af42e11919a8fd217c9ed5aa4a.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot 
9126f805ed20572a1575a160671c7550.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Received repeated 
offer for slot [9126f805ed20572a1575a160671c7550]. Ignoring.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Impulse (1/1) 
(99f6b20fa854798aef66c96096d15a10) switched from SCHEDULED to DEPLOYING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: 
Impulse (1/1) (attempt #0) to 3f91ce1c-996e-45e4-828b-45f9a90b56b0 @ localhost 
(dataPort=-1)
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot 
9126f805ed20572a1575a160671c7550.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task Source: 
Impulse (1/1).
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Impulse (1/1) 
(921e043a5e761f4369f5fdc624e89317) switched from SCHEDULED to DEPLOYING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: 
Impulse (1/1) (attempt #0) to 3f91ce1c-996e-45e4-828b-45f9a90b56b0 @ localhost 
(dataPort=-1)
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot 
9126f805ed20572a1575a160671c7550.
[Source: Impulse (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - 
Source: Impulse (1/1) (99f6b20fa854798aef66c96096d15a10) switched from CREATED 
to DEPLOYING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (1/2) (2009f6288f1440b2f1a50766144061a5) 
switched from SCHEDULED to DEPLOYING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (1/2) (attempt #0) to 
3f91ce1c-996e-45e4-828b-45f9a90b56b0 @ localhost (dataPort=-1)
[Source: Impulse (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - 
Loading JAR files for task Source: Impulse (1/1) 
(99f6b20fa854798aef66c96096d15a10) [DEPLOYING].
[Source: Impulse (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - 
Registering task at network: Source: Impulse (1/1) 
(99f6b20fa854798aef66c96096d15a10) [DEPLOYING].
[Source: Impulse (1/1)] INFO 
org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been 
configured, using default (Memory / JobManager) MemoryStateBackend (data in 
heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 
'null', asynchronous: TRUE, maxStateSize: 5242880)
[Source: Impulse (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - 
Source: Impulse (1/1) (99f6b20fa854798aef66c96096d15a10) switched from 
DEPLOYING to RUNNING.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task Source: 
Impulse (1/1).
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (2/2) (9e22f9760d3e65975febf82a0f9865f7) 
switched from SCHEDULED to DEPLOYING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (2/2) (attempt #0) to 
3f91ce1c-996e-45e4-828b-45f9a90b56b0 @ localhost (dataPort=-1)
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
[4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, Window.Into(), 
WithKeys} (1/2) (22a911c6d6cf466766fa43845ca663f9) switched from SCHEDULED to 
DEPLOYING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying 
[4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, Window.Into(), 
WithKeys} (1/2) (attempt #0) to 3f91ce1c-996e-45e4-828b-45f9a90b56b0 @ 
localhost (dataPort=-1)
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
[4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, Window.Into(), 
WithKeys} (2/2) (c83a81517672d8831530a88f708b4c8f) switched from SCHEDULED to 
DEPLOYING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying 
[4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, Window.Into(), 
WithKeys} (2/2) (attempt #0) to 3f91ce1c-996e-45e4-828b-45f9a90b56b0 @ 
localhost (dataPort=-1)
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot 
9b80ba6cd31b571f18e396e825715d97.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (1/2).
[Source: Impulse (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - 
Source: Impulse (1/1) (921e043a5e761f4369f5fdc624e89317) switched from CREATED 
to DEPLOYING.
[Source: Impulse (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - 
Loading JAR files for task Source: Impulse (1/1) 
(921e043a5e761f4369f5fdc624e89317) [DEPLOYING].
[Source: Impulse (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - 
Registering task at network: Source: Impulse (1/1) 
(921e043a5e761f4369f5fdc624e89317) [DEPLOYING].
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot 
9126f805ed20572a1575a160671c7550.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (2/2).
[[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (1/2)] INFO 
org.apache.flink.runtime.taskmanager.Task - 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (1/2) (2009f6288f1440b2f1a50766144061a5) 
switched from CREATED to DEPLOYING.
[[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (1/2)] INFO 
org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (1/2) (2009f6288f1440b2f1a50766144061a5) 
[DEPLOYING].
[[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (1/2)] INFO 
org.apache.flink.runtime.taskmanager.Task - Registering task at network: 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (1/2) (2009f6288f1440b2f1a50766144061a5) 
[DEPLOYING].
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot 
9b80ba6cd31b571f18e396e825715d97.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - [1]ParDo(Anonymous) 
(1/2) (7b7a399e098469acc84d1a72e07f0831) switched from SCHEDULED to DEPLOYING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying 
[1]ParDo(Anonymous) (1/2) (attempt #0) to 3f91ce1c-996e-45e4-828b-45f9a90b56b0 
@ localhost (dataPort=-1)
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - [1]ParDo(Anonymous) 
(2/2) (e4a6148fb3ee7eb50bc24b0dbad5d6bd) switched from SCHEDULED to DEPLOYING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying 
[1]ParDo(Anonymous) (2/2) (attempt #0) to 3f91ce1c-996e-45e4-828b-45f9a90b56b0 
@ localhost (dataPort=-1)
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - [6]{statefulDoFn, 
PAssert$4} (1/2) (526919a5589d6979e92abbd0735b5888) switched from SCHEDULED to 
DEPLOYING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying 
[6]{statefulDoFn, PAssert$4} (1/2) (attempt #0) to 
3f91ce1c-996e-45e4-828b-45f9a90b56b0 @ localhost (dataPort=-1)
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - [6]{statefulDoFn, 
PAssert$4} (2/2) (dd36537a90b0ca0fd66178e6392c94b5) switched from SCHEDULED to 
DEPLOYING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying 
[6]{statefulDoFn, PAssert$4} (2/2) (attempt #0) to 
3f91ce1c-996e-45e4-828b-45f9a90b56b0 @ localhost (dataPort=-1)
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - ToKeyedWorkItem (1/2) 
(843cf995d5a76f3c98c8e299b4fb90ca) switched from SCHEDULED to DEPLOYING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying 
ToKeyedWorkItem (1/2) (attempt #0) to 3f91ce1c-996e-45e4-828b-45f9a90b56b0 @ 
localhost (dataPort=-1)
[Source: Impulse (1/1)] INFO 
org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been 
configured, using default (Memory / JobManager) MemoryStateBackend (data in 
heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 
'null', asynchronous: TRUE, maxStateSize: 5242880)
[Source: Impulse (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - 
Source: Impulse (1/1) (921e043a5e761f4369f5fdc624e89317) switched from 
DEPLOYING to RUNNING.
[[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (2/2)] INFO 
org.apache.flink.runtime.taskmanager.Task - 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (2/2) (9e22f9760d3e65975febf82a0f9865f7) 
switched from CREATED to DEPLOYING.
[[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (2/2)] INFO 
org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (2/2) (9e22f9760d3e65975febf82a0f9865f7) 
[DEPLOYING].
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task 
[4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, Window.Into(), 
WithKeys} (1/2).
[[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (2/2)] INFO 
org.apache.flink.runtime.taskmanager.Task - Registering task at network: 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (2/2) (9e22f9760d3e65975febf82a0f9865f7) 
[DEPLOYING].
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot 
9b80ba6cd31b571f18e396e825715d97.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task 
[1]ParDo(Anonymous) (1/2).
[[4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, 
Window.Into(), WithKeys} (1/2)] INFO org.apache.flink.runtime.taskmanager.Task 
- [4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, 
Window.Into(), WithKeys} (1/2) (22a911c6d6cf466766fa43845ca663f9) switched from 
CREATED to DEPLOYING.
[[4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, 
Window.Into(), WithKeys} (1/2)] INFO org.apache.flink.runtime.taskmanager.Task 
- Loading JAR files for task [4]PAssert$4/GroupGlobally/{Create.Values, 
Flatten.PCollections, Window.Into(), WithKeys} (1/2) 
(22a911c6d6cf466766fa43845ca663f9) [DEPLOYING].
[[4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, 
Window.Into(), WithKeys} (1/2)] INFO org.apache.flink.runtime.taskmanager.Task 
- Registering task at network: [4]PAssert$4/GroupGlobally/{Create.Values, 
Flatten.PCollections, Window.Into(), WithKeys} (1/2) 
(22a911c6d6cf466766fa43845ca663f9) [DEPLOYING].
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot 
9126f805ed20572a1575a160671c7550.
[[1]ParDo(Anonymous) (1/2)] INFO org.apache.flink.runtime.taskmanager.Task - 
[1]ParDo(Anonymous) (1/2) (7b7a399e098469acc84d1a72e07f0831) switched from 
CREATED to DEPLOYING.
[[1]ParDo(Anonymous) (1/2)] INFO org.apache.flink.runtime.taskmanager.Task - 
Loading JAR files for task [1]ParDo(Anonymous) (1/2) 
(7b7a399e098469acc84d1a72e07f0831) [DEPLOYING].
[[1]ParDo(Anonymous) (1/2)] INFO org.apache.flink.runtime.taskmanager.Task - 
Registering task at network: [1]ParDo(Anonymous) (1/2) 
(7b7a399e098469acc84d1a72e07f0831) [DEPLOYING].
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task 
[1]ParDo(Anonymous) (2/2).
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot 
9b80ba6cd31b571f18e396e825715d97.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task 
[6]{statefulDoFn, PAssert$4} (1/2).
[[1]ParDo(Anonymous) (2/2)] INFO org.apache.flink.runtime.taskmanager.Task - 
[1]ParDo(Anonymous) (2/2) (e4a6148fb3ee7eb50bc24b0dbad5d6bd) switched from 
CREATED to DEPLOYING.
[[1]ParDo(Anonymous) (2/2)] INFO org.apache.flink.runtime.taskmanager.Task - 
Loading JAR files for task [1]ParDo(Anonymous) (2/2) 
(e4a6148fb3ee7eb50bc24b0dbad5d6bd) [DEPLOYING].
[[1]ParDo(Anonymous) (2/2)] INFO org.apache.flink.runtime.taskmanager.Task - 
Registering task at network: [1]ParDo(Anonymous) (2/2) 
(e4a6148fb3ee7eb50bc24b0dbad5d6bd) [DEPLOYING].
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot 
9126f805ed20572a1575a160671c7550.
[[6]{statefulDoFn, PAssert$4} (1/2)] INFO 
org.apache.flink.runtime.taskmanager.Task - [6]{statefulDoFn, PAssert$4} (1/2) 
(526919a5589d6979e92abbd0735b5888) switched from CREATED to DEPLOYING.
[[6]{statefulDoFn, PAssert$4} (1/2)] INFO 
org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task 
[6]{statefulDoFn, PAssert$4} (1/2) (526919a5589d6979e92abbd0735b5888) 
[DEPLOYING].
[[6]{statefulDoFn, PAssert$4} (1/2)] INFO 
org.apache.flink.runtime.taskmanager.Task - Registering task at network: 
[6]{statefulDoFn, PAssert$4} (1/2) (526919a5589d6979e92abbd0735b5888) 
[DEPLOYING].
[[4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, 
Window.Into(), WithKeys} (1/2)] INFO 
org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been 
configured, using default (Memory / JobManager) MemoryStateBackend (data in 
heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 
'null', asynchronous: TRUE, maxStateSize: 5242880)
[[4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, 
Window.Into(), WithKeys} (1/2)] INFO org.apache.flink.runtime.taskmanager.Task 
- [4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, 
Window.Into(), WithKeys} (1/2) (22a911c6d6cf466766fa43845ca663f9) switched from 
DEPLOYING to RUNNING.
[[4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, 
Window.Into(), WithKeys} (1/2)] WARN org.apache.flink.metrics.MetricGroup - The 
operator name [4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, 
Window.Into(), WithKeys} exceeded the 80 characters length limit and was 
truncated.
[[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (1/2)] INFO 
org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been 
configured, using default (Memory / JobManager) MemoryStateBackend (data in 
heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 
'null', asynchronous: TRUE, maxStateSize: 5242880)
[[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (1/2)] INFO 
org.apache.flink.runtime.taskmanager.Task - 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (1/2) (2009f6288f1440b2f1a50766144061a5) 
switched from DEPLOYING to RUNNING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - ToKeyedWorkItem (2/2) 
(fecef26116844a28baea28f79a3c4d7f) switched from SCHEDULED to DEPLOYING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying 
ToKeyedWorkItem (2/2) (attempt #0) to 3f91ce1c-996e-45e4-828b-45f9a90b56b0 @ 
localhost (dataPort=-1)
[[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (1/2)] WARN 
org.apache.flink.metrics.MetricGroup - The operator name 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} exceeded the 80 characters length limit and 
was truncated.
[[4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, 
Window.Into(), WithKeys} (1/2)] INFO 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap 
keyed state backend with stream factory.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
PAssert$4/GroupGlobally/GroupByKey -> [5]PAssert$4/{GroupGlobally, GetPane, 
RunChecks, VerifyAssertions} (1/2) (e31ef171e4a26955071e5db4fd643cd9) switched 
from SCHEDULED to DEPLOYING.
[[1]ParDo(Anonymous) (2/2)] INFO 
org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been 
configured, using default (Memory / JobManager) MemoryStateBackend (data in 
heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 
'null', asynchronous: TRUE, maxStateSize: 5242880)
[[1]ParDo(Anonymous) (2/2)] INFO org.apache.flink.runtime.taskmanager.Task - 
[1]ParDo(Anonymous) (2/2) (e4a6148fb3ee7eb50bc24b0dbad5d6bd) switched from 
DEPLOYING to RUNNING.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task 
[4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, Window.Into(), 
WithKeys} (2/2).
[[6]{statefulDoFn, PAssert$4} (1/2)] INFO 
org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been 
configured, using default (Memory / JobManager) MemoryStateBackend (data in 
heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 
'null', asynchronous: TRUE, maxStateSize: 5242880)
[[6]{statefulDoFn, PAssert$4} (1/2)] INFO 
org.apache.flink.runtime.taskmanager.Task - [6]{statefulDoFn, PAssert$4} (1/2) 
(526919a5589d6979e92abbd0735b5888) switched from DEPLOYING to RUNNING.
[[1]ParDo(Anonymous) (1/2)] INFO 
org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been 
configured, using default (Memory / JobManager) MemoryStateBackend (data in 
heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 
'null', asynchronous: TRUE, maxStateSize: 5242880)
[[1]ParDo(Anonymous) (1/2)] INFO org.apache.flink.runtime.taskmanager.Task - 
[1]ParDo(Anonymous) (1/2) (7b7a399e098469acc84d1a72e07f0831) switched from 
DEPLOYING to RUNNING.
[[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (2/2)] INFO 
org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been 
configured, using default (Memory / JobManager) MemoryStateBackend (data in 
heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 
'null', asynchronous: TRUE, maxStateSize: 5242880)
[[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (2/2)] INFO 
org.apache.flink.runtime.taskmanager.Task - 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (2/2) (9e22f9760d3e65975febf82a0f9865f7) 
switched from DEPLOYING to RUNNING.
[[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (2/2)] WARN 
org.apache.flink.metrics.MetricGroup - The operator name 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} exceeded the 80 characters length limit and 
was truncated.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot 
9126f805ed20572a1575a160671c7550.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task 
[6]{statefulDoFn, PAssert$4} (2/2).
[[4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, 
Window.Into(), WithKeys} (2/2)] INFO org.apache.flink.runtime.taskmanager.Task 
- [4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, 
Window.Into(), WithKeys} (2/2) (c83a81517672d8831530a88f708b4c8f) switched from 
CREATED to DEPLOYING.
[[4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, 
Window.Into(), WithKeys} (2/2)] INFO org.apache.flink.runtime.taskmanager.Task 
- Loading JAR files for task [4]PAssert$4/GroupGlobally/{Create.Values, 
Flatten.PCollections, Window.Into(), WithKeys} (2/2) 
(c83a81517672d8831530a88f708b4c8f) [DEPLOYING].
[[4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, 
Window.Into(), WithKeys} (2/2)] INFO org.apache.flink.runtime.taskmanager.Task 
- Registering task at network: [4]PAssert$4/GroupGlobally/{Create.Values, 
Flatten.PCollections, Window.Into(), WithKeys} (2/2) 
(c83a81517672d8831530a88f708b4c8f) [DEPLOYING].
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot 
9b80ba6cd31b571f18e396e825715d97.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task 
ToKeyedWorkItem (1/2).
[[6]{statefulDoFn, PAssert$4} (2/2)] INFO 
org.apache.flink.runtime.taskmanager.Task - [6]{statefulDoFn, PAssert$4} (2/2) 
(dd36537a90b0ca0fd66178e6392c94b5) switched from CREATED to DEPLOYING.
[[6]{statefulDoFn, PAssert$4} (2/2)] INFO 
org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task 
[6]{statefulDoFn, PAssert$4} (2/2) (dd36537a90b0ca0fd66178e6392c94b5) 
[DEPLOYING].
[[6]{statefulDoFn, PAssert$4} (2/2)] INFO 
org.apache.flink.runtime.taskmanager.Task - Registering task at network: 
[6]{statefulDoFn, PAssert$4} (2/2) (dd36537a90b0ca0fd66178e6392c94b5) 
[DEPLOYING].
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying 
PAssert$4/GroupGlobally/GroupByKey -> [5]PAssert$4/{GroupGlobally, GetPane, 
RunChecks, VerifyAssertions} (1/2) (attempt #0) to 
3f91ce1c-996e-45e4-828b-45f9a90b56b0 @ localhost (dataPort=-1)
[Source: Impulse (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - 
Source: Impulse (1/1) (99f6b20fa854798aef66c96096d15a10) switched from RUNNING 
to FINISHED.
[Source: Impulse (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - 
Freeing task resources for Source: Impulse (1/1) 
(99f6b20fa854798aef66c96096d15a10).
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
PAssert$4/GroupGlobally/GroupByKey -> [5]PAssert$4/{GroupGlobally, GetPane, 
RunChecks, VerifyAssertions} (2/2) (95cbb611a5d92ecc799c404c32628577) switched 
from SCHEDULED to DEPLOYING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying 
PAssert$4/GroupGlobally/GroupByKey -> [5]PAssert$4/{GroupGlobally, GetPane, 
RunChecks, VerifyAssertions} (2/2) (attempt #0) to 
3f91ce1c-996e-45e4-828b-45f9a90b56b0 @ localhost (dataPort=-1)
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Impulse (1/1) 
(99f6b20fa854798aef66c96096d15a10) switched from DEPLOYING to RUNNING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Impulse (1/1) 
(921e043a5e761f4369f5fdc624e89317) switched from DEPLOYING to RUNNING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
[4]PAssert$4/GroupGlobally/{Create.Values, Flatten.PCollections, Window.Into(), 
WithKeys} (1/2) (22a911c6d6cf466766fa43845ca663f9) switched from DEPLOYING to 
RUNNING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (1/2) (2009f6288f1440b2f1a50766144061a5) 
switched from DEPLOYING to RUNNING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - [1]ParDo(Anonymous) 
(2/2) (e4a6148fb3ee7eb50bc24b0dbad5d6bd) switched from DEPLOYING to RUNNING.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot 
9126f805ed20572a1575a160671c7550.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - [6]{statefulDoFn, 
PAssert$4} (1/2) (526919a5589d6979e92abbd0735b5888) switched from DEPLOYING to 
RUNNING.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - [1]ParDo(Anonymous) 
(1/2) (7b7a399e098469acc84d1a72e07f0831) switched from DEPLOYING to RUNNING.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
[3]PAssert$4/GroupGlobally/Create.Values/Read(CreateSource)/{ParDo(OutputSingleSource),
 ParDo(BoundedSourceAsSDFWrapper)} (2/2) (9e22f9760d3e65975febf82a0f9865f7) 
switched from DEPLOYING to RUNNING.
[ToKeyedWorkItem (1/2)] INFO org.apache.flink.runtime.taskmanager.Task - 
ToKeyedWorkItem (1/2) (843cf995d5a76f3c98c8e299b4fb90ca) switched from CREATED 
to DEPLOYING.
[ToKeyedWorkItem (1/2)] INFO org.apache.flink.runtime.taskmanager.Task - 
Loading JAR files for task ToKeyedWorkItem (1/2) 
(843cf995d5a76f3c98c8e299b4fb90ca) [DEPLOYING].
[ToKeyedWorkItem (1/2)] INFO org.apache.flink.runtime.taskmanager.Task - 
Registering task at 
...[truncated 168485 chars]...
efault-dispatcher-2] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Discarding the results 
produced by task execution 939b158d27ac04cb854c09da06c52409.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task CHAIN 
MapPartition (MapPartition at [4]PAssert$5/GroupGlobally/{Create.Values, 
Flatten.PCollections, Window.Into(), WithKeys}) -> FlatMap (FlatMap at 
ExtractOutput[0]) (2/2) (22b9f10be1efde1897b0226e9f8c088e).
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskmanager.Task - CHAIN MapPartition (MapPartition at 
[4]PAssert$5/GroupGlobally/{Create.Values, Flatten.PCollections, Window.Into(), 
WithKeys}) -> FlatMap (FlatMap at ExtractOutput[0]) (2/2) 
(22b9f10be1efde1897b0226e9f8c088e) switched from RUNNING to CANCELING.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task 
code CHAIN MapPartition (MapPartition at 
[4]PAssert$5/GroupGlobally/{Create.Values, Flatten.PCollections, Window.Into(), 
WithKeys}) -> FlatMap (FlatMap at ExtractOutput[0]) (2/2) 
(22b9f10be1efde1897b0226e9f8c088e).
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task CHAIN 
GroupReduce (GroupReduce at [6]{statefulDoFn, PAssert$5}) -> FlatMap (FlatMap 
at ExtractOutput[0]) (1/2) (4800cf7f639852287abc4eda108adae5).
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskmanager.Task - CHAIN GroupReduce (GroupReduce at 
[6]{statefulDoFn, PAssert$5}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/2) 
(4800cf7f639852287abc4eda108adae5) switched from RUNNING to CANCELING.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task 
code CHAIN GroupReduce (GroupReduce at [6]{statefulDoFn, PAssert$5}) -> FlatMap 
(FlatMap at ExtractOutput[0]) (1/2) (4800cf7f639852287abc4eda108adae5).
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task CHAIN 
GroupReduce (GroupReduce at [6]{statefulDoFn, PAssert$5}) -> FlatMap (FlatMap 
at ExtractOutput[0]) (2/2) (927b6daf77115b8c769e135bee9c44f9).
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskmanager.Task - CHAIN GroupReduce (GroupReduce at 
[6]{statefulDoFn, PAssert$5}) -> FlatMap (FlatMap at ExtractOutput[0]) (2/2) 
(927b6daf77115b8c769e135bee9c44f9) switched from RUNNING to CANCELING.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task 
code CHAIN GroupReduce (GroupReduce at [6]{statefulDoFn, PAssert$5}) -> FlatMap 
(FlatMap at ExtractOutput[0]) (2/2) (927b6daf77115b8c769e135bee9c44f9).
[grpc-default-executor-5] INFO 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService - 
getProcessBundleDescriptor request with id 9-4
[CHAIN MapPartition (MapPartition at [4]PAssert$5/GroupGlobally/{Create.Values, 
Flatten.PCollections, Window.Into(), WithKeys}) -> FlatMap (FlatMap at 
ExtractOutput[0]) (2/2)] INFO org.apache.flink.runtime.taskmanager.Task - CHAIN 
MapPartition (MapPartition at [4]PAssert$5/GroupGlobally/{Create.Values, 
Flatten.PCollections, Window.Into(), WithKeys}) -> FlatMap (FlatMap at 
ExtractOutput[0]) (2/2) (22b9f10be1efde1897b0226e9f8c088e) switched from 
CANCELING to CANCELED.
[CHAIN MapPartition (MapPartition at [4]PAssert$5/GroupGlobally/{Create.Values, 
Flatten.PCollections, Window.Into(), WithKeys}) -> FlatMap (FlatMap at 
ExtractOutput[0]) (2/2)] INFO org.apache.flink.runtime.taskmanager.Task - 
Freeing task resources for CHAIN MapPartition (MapPartition at 
[4]PAssert$5/GroupGlobally/{Create.Values, Flatten.PCollections, Window.Into(), 
WithKeys}) -> FlatMap (FlatMap at ExtractOutput[0]) (2/2) 
(22b9f10be1efde1897b0226e9f8c088e).
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and 
sending final execution state CANCELED to JobManager for task CHAIN 
MapPartition (MapPartition at [4]PAssert$5/GroupGlobally/{Create.Values, 
Flatten.PCollections, Window.Into(), WithKeys}) -> FlatMap (FlatMap at 
ExtractOutput[0]) (2/2) 22b9f10be1efde1897b0226e9f8c088e.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition 
(MapPartition at [4]PAssert$5/GroupGlobally/{Create.Values, 
Flatten.PCollections, Window.Into(), WithKeys}) -> FlatMap (FlatMap at 
ExtractOutput[0]) (2/2) (22b9f10be1efde1897b0226e9f8c088e) switched from 
CANCELING to CANCELED.
[CHAIN GroupReduce (GroupReduce at [6]{statefulDoFn, PAssert$5}) -> FlatMap 
(FlatMap at ExtractOutput[0]) (2/2)] INFO 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory - Closing 
environment urn: "EMBEDDED"
capabilities: "beam:coder:bytes:v1"
capabilities: "beam:coder:bool:v1"
capabilities: "beam:coder:varint:v1"
capabilities: "beam:coder:string_utf8:v1"
capabilities: "beam:coder:iterable:v1"
capabilities: "beam:coder:timer:v1"
capabilities: "beam:coder:kv:v1"
capabilities: "beam:coder:length_prefix:v1"
capabilities: "beam:coder:global_window:v1"
capabilities: "beam:coder:interval_window:v1"
capabilities: "beam:coder:custom_window:v1"
capabilities: "beam:coder:windowed_value:v1"
capabilities: "beam:coder:double:v1"
capabilities: "beam:coder:row:v1"
capabilities: "beam:coder:param_windowed_value:v1"
capabilities: "beam:coder:state_backed_iterable:v1"
capabilities: "beam:coder:sharded_key:v1"
capabilities: "beam:protocol:multi_core_bundle_processing:v1"
capabilities: "beam:protocol:progress_reporting:v1"
capabilities: "beam:protocol:harness_monitoring_infos:v1"
capabilities: "beam:version:sdk_base:apache/beam_java8_sdk:2.35.0.dev"
capabilities: "beam:transform:sdf_truncate_sized_restrictions:v1"
capabilities: "beam:transform:to_string:v1"

[CHAIN GroupReduce (GroupReduce at [6]{statefulDoFn, PAssert$5}) -> FlatMap 
(FlatMap at ExtractOutput[0]) (1/2)] INFO 
org.apache.flink.runtime.taskmanager.Task - CHAIN GroupReduce (GroupReduce at 
[6]{statefulDoFn, PAssert$5}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/2) 
(4800cf7f639852287abc4eda108adae5) switched from CANCELING to CANCELED.
[CHAIN GroupReduce (GroupReduce at [6]{statefulDoFn, PAssert$5}) -> FlatMap 
(FlatMap at ExtractOutput[0]) (1/2)] INFO 
org.apache.flink.runtime.taskmanager.Task - Freeing task resources for CHAIN 
GroupReduce (GroupReduce at [6]{statefulDoFn, PAssert$5}) -> FlatMap (FlatMap 
at ExtractOutput[0]) (1/2) (4800cf7f639852287abc4eda108adae5).
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and 
sending final execution state CANCELED to JobManager for task CHAIN GroupReduce 
(GroupReduce at [6]{statefulDoFn, PAssert$5}) -> FlatMap (FlatMap at 
ExtractOutput[0]) (1/2) 4800cf7f639852287abc4eda108adae5.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN GroupReduce 
(GroupReduce at [6]{statefulDoFn, PAssert$5}) -> FlatMap (FlatMap at 
ExtractOutput[0]) (1/2) (4800cf7f639852287abc4eda108adae5) switched from 
CANCELING to CANCELED.
[CHAIN GroupReduce (GroupReduce at [6]{statefulDoFn, PAssert$5}) -> FlatMap 
(FlatMap at ExtractOutput[0]) (2/2)] INFO 
org.apache.beam.runners.fnexecution.logging.GrpcLoggingService - 1 Beam Fn 
Logging clients still connected during shutdown.
[CHAIN GroupReduce (GroupReduce at [6]{statefulDoFn, PAssert$5}) -> FlatMap 
(FlatMap at ExtractOutput[0]) (2/2)] ERROR 
org.apache.beam.runners.fnexecution.control.FnApiControlClient - 
FnApiControlClient closed, clearing outstanding requests 
{2=java.util.concurrent.CompletableFuture@1e570df0[Not completed, 1 
dependents], 3=java.util.concurrent.CompletableFuture@4d674e[Not completed, 1 
dependents]}
[CHAIN GroupReduce (GroupReduce at [6]{statefulDoFn, PAssert$5}) -> FlatMap 
(FlatMap at ExtractOutput[0]) (2/2)] WARN 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer - Hanged up for unknown 
endpoint.
[CHAIN GroupReduce (GroupReduce at [6]{statefulDoFn, PAssert$5}) -> FlatMap 
(FlatMap at ExtractOutput[0]) (2/2)] INFO 
org.apache.flink.runtime.taskmanager.Task - CHAIN GroupReduce (GroupReduce at 
[6]{statefulDoFn, PAssert$5}) -> FlatMap (FlatMap at ExtractOutput[0]) (2/2) 
(927b6daf77115b8c769e135bee9c44f9) switched from CANCELING to CANCELED.
[CHAIN GroupReduce (GroupReduce at [6]{statefulDoFn, PAssert$5}) -> FlatMap 
(FlatMap at ExtractOutput[0]) (2/2)] INFO 
org.apache.flink.runtime.taskmanager.Task - Freeing task resources for CHAIN 
GroupReduce (GroupReduce at [6]{statefulDoFn, PAssert$5}) -> FlatMap (FlatMap 
at ExtractOutput[0]) (2/2) (927b6daf77115b8c769e135bee9c44f9).
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and 
sending final execution state CANCELED to JobManager for task CHAIN GroupReduce 
(GroupReduce at [6]{statefulDoFn, PAssert$5}) -> FlatMap (FlatMap at 
ExtractOutput[0]) (2/2) 927b6daf77115b8c769e135bee9c44f9.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN GroupReduce 
(GroupReduce at [6]{statefulDoFn, PAssert$5}) -> FlatMap (FlatMap at 
ExtractOutput[0]) (2/2) (927b6daf77115b8c769e135bee9c44f9) switched from 
CANCELING to CANCELED.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job 
portablestateexecutiontest-jenkins-1019181556-a21709d 
(77e69a68705518ec04134f86880f3053) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:206)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:189)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:639)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:396)
        at sun.reflect.GeneratedMethodAccessor77.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: 
Error received from SDK harness for instruction 1: 
java.lang.NullPointerException
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
        at 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
        at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
        at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.$closeResource(FlinkExecutableStageFunction.java:268)
        at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:268)
        at 
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:113)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:513)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:356)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error received from SDK harness for 
instruction 1: java.lang.NullPointerException
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

        at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
        at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:255)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:765)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.minicluster.MiniCluster - Shutting down Flink Mini 
Cluster
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job 
77e69a68705518ec04134f86880f3053 reached globally terminal state FAILED.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Shutting down rest 
endpoint.
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopping TaskExecutor 
akka://flink/user/rpc/taskmanager_55.
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Close ResourceManager 
connection caf194a3d1e2ba6821810eecc80ebd62.
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager 
connection for job 77e69a68705518ec04134f86880f3053.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Closing 
TaskExecutor connection ec00eeb8-044e-4583-aa6f-8264190c0a3b because: The 
TaskExecutor is shutting down.
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Free slot 
TaskSlot(index:0, state:ALLOCATED, resource profile: 
ResourceProfile{managedMemory=354.999mb (372243104 bytes), 
networkMemory=32.000mb (33554432 bytes)}, allocationId: 
db2b099377247c984f8236a6d468b4e0, jobId: 77e69a68705518ec04134f86880f3053).
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job 
portablestateexecutiontest-jenkins-1019181556-a21709d(77e69a68705518ec04134f86880f3053).
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Suspending SlotPool.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection 
caf194a3d1e2ba6821810eecc80ebd62: Stopping JobMaster for job 
portablestateexecutiontest-jenkins-1019181556-a21709d(77e69a68705518ec04134f86880f3053)..
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Disconnect 
job manager 
9756d745fc07d200cddc5a0be1a846e8@akka://flink/user/rpc/jobmanager_58 for job 
77e69a68705518ec04134f86880f3053 from the resource manager.
[flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Stopping SlotPool.
[pool-54-thread-1] ERROR org.apache.beam.runners.jobsubmission.JobInvocation - 
Error during job invocation id.
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Free slot 
TaskSlot(index:1, state:ALLOCATED, resource profile: 
ResourceProfile{managedMemory=354.999mb (372243104 bytes), 
networkMemory=32.000mb (33554432 bytes)}, allocationId: 
00f4772220e627958ddc4edd73e0b320, jobId: 77e69a68705518ec04134f86880f3053).
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at 
org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:201)
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:926)
        at akka.dispatch.OnComplete.internal(Future.scala:264)
        at akka.dispatch.OnComplete.internal(Future.scala:261)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
        at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
        at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
        at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
        at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
        at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
        at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
        at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
        at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
        at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
        at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
        at 
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:206)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:189)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:639)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:396)
        at sun.reflect.GeneratedMethodAccessor77.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        ... 4 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: 
Error received from SDK harness for instruction 1: 
java.lang.NullPointerException
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
        at 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
        at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
        at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.$closeResource(FlinkExecutableStageFunction.java:268)
        at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:268)
        at 
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:113)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:513)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:356)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error received from SDK harness for 
instruction 1: java.lang.NullPointerException
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

        at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
        at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:255)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:765)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
[mini-cluster-io-thread-16] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job 
77e69a68705518ec04134f86880f3053 with leader id 
9756d745fc07d200cddc5a0be1a846e8 lost leadership.
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService - Stop job leader 
service.
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - Shutting 
down TaskExecutorLocalStateStoresManager.
Oct 19, 2021 6:16:04 PM 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference
 cleanQueue
SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=182, 
target=directaddress:///InProcessServer_46} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until 
awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:615)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:261)
        at 
org.apache.beam.sdk.fn.channel.ManagedChannelFactory.forDescriptor(ManagedChannelFactory.java:44)
        at 
org.apache.beam.fn.harness.data.BeamFnDataGrpcClient.lambda$getClientFor$0(BeamFnDataGrpcClient.java:117)
        at 
java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
        at 
org.apache.beam.fn.harness.data.BeamFnDataGrpcClient.getClientFor(BeamFnDataGrpcClient.java:111)
        at 
org.apache.beam.fn.harness.data.BeamFnDataGrpcClient.send(BeamFnDataGrpcClient.java:102)
        at 
org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.send(QueueingBeamFnDataClient.java:249)
        at 
org.apache.beam.fn.harness.BeamFnDataWriteRunner.registerForOutput(BeamFnDataWriteRunner.java:142)
        at 
org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:111)
        at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:432)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Oct 19, 2021 6:16:04 PM 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference
 cleanQueue
SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=204, 
target=directaddress:///InProcessServer_52} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until 
awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:615)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:261)
        at 
org.apache.beam.sdk.fn.channel.ManagedChannelFactory.forDescriptor(ManagedChannelFactory.java:44)
        at 
org.apache.beam.fn.harness.data.BeamFnDataGrpcClient.lambda$getClientFor$0(BeamFnDataGrpcClient.java:117)
        at 
java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
        at 
org.apache.beam.fn.harness.data.BeamFnDataGrpcClient.getClientFor(BeamFnDataGrpcClient.java:111)
        at 
org.apache.beam.fn.harness.data.BeamFnDataGrpcClient.send(BeamFnDataGrpcClient.java:102)
        at 
org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.send(QueueingBeamFnDataClient.java:249)
        at 
org.apache.beam.fn.harness.BeamFnDataWriteRunner.registerForOutput(BeamFnDataWriteRunner.java:142)
        at 
org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:111)
        at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:432)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Oct 19, 2021 6:16:04 PM 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference
 cleanQueue
SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=204, 
target=directaddress:///InProcessServer_52} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until 
awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:615)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:261)
        at 
org.apache.beam.sdk.fn.channel.ManagedChannelFactory.forDescriptor(ManagedChannelFactory.java:44)
        at 
org.apache.beam.fn.harness.data.BeamFnDataGrpcClient.lambda$getClientFor$0(BeamFnDataGrpcClient.java:117)
        at 
java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
        at 
org.apache.beam.fn.harness.data.BeamFnDataGrpcClient.getClientFor(BeamFnDataGrpcClient.java:111)
        at 
org.apache.beam.fn.harness.data.BeamFnDataGrpcClient.send(BeamFnDataGrpcClient.java:102)
        at 
org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.send(QueueingBeamFnDataClient.java:249)
        at 
org.apache.beam.fn.harness.BeamFnDataWriteRunner.registerForOutput(BeamFnDataWriteRunner.java:142)
        at 
org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:111)
        at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:432)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.io.disk.FileChannelManagerImpl - FileChannelManager 
removed spill file directory /tmp/flink-io-5d68526c-f1ef-4e7d-a1b4-50c5832012d0
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.io.network.NettyShuffleEnvironment - Shutting down the 
network environment and its components.
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.io.disk.FileChannelManagerImpl - FileChannelManager 
removed spill file directory 
/tmp/flink-netty-shuffle-ef03cf74-4d9d-48f2-8510-03856eee46ca
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.taskexecutor.KvStateService - Shutting down the 
kvState service and its components.
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService - Stop job leader 
service.
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.filecache.FileCache - removed file cache directory 
/tmp/flink-dist-cache-6b1a75a7-e6f2-42e3-936e-d45b56ac6d74
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopped TaskExecutor 
akka://flink/user/rpc/taskmanager_55.
Oct 19, 2021 6:16:04 PM 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference
 cleanQueue
SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=178, 
target=directaddress:///InProcessServer_47} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until 
awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:615)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:261)
        at 
org.apache.beam.sdk.fn.channel.ManagedChannelFactory.forDescriptor(ManagedChannelFactory.java:44)
        at 
org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache$GrpcStateClient.<init>(BeamFnStateGrpcClientCache.java:89)
        at 
org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache$GrpcStateClient.<init>(BeamFnStateGrpcClientCache.java:79)
        at 
org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache.createBeamFnStateClient(BeamFnStateGrpcClientCache.java:75)
        at 
java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
        at 
org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache.forApiServiceDescriptor(BeamFnStateGrpcClientCache.java:71)
        at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:603)
        at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:413)
        at 
org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:761)
        at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:408)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Oct 19, 2021 6:16:04 PM 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference
 cleanQueue
SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=178, 
target=directaddress:///InProcessServer_47} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until 
awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:615)
        at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:261)
        at 
org.apache.beam.sdk.fn.channel.ManagedChannelFactory.forDescriptor(ManagedChannelFactory.java:44)
        at 
org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache$GrpcStateClient.<init>(BeamFnStateGrpcClientCache.java:89)
        at 
org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache$GrpcStateClient.<init>(BeamFnStateGrpcClientCache.java:79)
        at 
org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache.createBeamFnStateClient(BeamFnStateGrpcClientCache.java:75)
        at 
java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
        at 
org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache.forApiServiceDescriptor(BeamFnStateGrpcClientCache.java:71)
        at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:603)
        at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:413)
        at 
org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:761)
        at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:408)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

[ForkJoinPool.commonPool-worker-2] INFO 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Removing cache 
directory /tmp/flink-web-ui
[ForkJoinPool.commonPool-worker-2] INFO 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Shut down complete.
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Shut down 
cluster because application is in CANCELED, diagnostics 
DispatcherResourceManagerComponent has been closed..
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
 - Closing components.
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess - 
Stopping SessionDispatcherLeaderProcess.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Stopping dispatcher 
akka://flink/user/rpc/dispatcher_57.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Stopping all 
currently running jobs of dispatcher akka://flink/user/rpc/dispatcher_57.
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - Closing 
the SlotManager.
[flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - 
Suspending the SlotManager.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureRequestCoordinator
 - Shutting down back pressure request coordinator.
[flink-akka.actor.default-dispatcher-3] INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Stopped dispatcher 
akka://flink/user/rpc/dispatcher_57.
[mini-cluster-io-thread-20] INFO 
org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopping Akka RPC service.
[flink-metrics-2] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - 
Stopping Akka RPC service.
[flink-metrics-2] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - 
Stopped Akka RPC service.
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.blob.PermanentBlobCache - Shutting down BLOB cache
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.blob.TransientBlobCache - Shutting down BLOB cache
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:42139
[flink-akka.actor.default-dispatcher-2] INFO 
org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service.
Oct 19, 2021 6:16:22 PM 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture
 executeListener
SEVERE: RuntimeException while executing runnable 
CallbackListener{org.apache.beam.runners.jobsubmission.JobInvocation$1@31e727b} 
with executor 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@29fa1def
java.util.concurrent.RejectedExecutionException: Task 
CallbackListener{org.apache.beam.runners.jobsubmission.JobInvocation$1@31e727b} 
rejected from java.util.concurrent.ThreadPoolExecutor@194d60d4[Shutting down, 
pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
        at 
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
        at 
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
        at 
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors$ListeningDecorator.execute(MoreExecutors.java:537)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1029)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:871)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:716)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.afterRanInterruptibly(TrustedListenableFutureTask.java:133)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:80)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Oct 19, 2021 6:16:22 PM 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture
 executeListener
SEVERE: RuntimeException while executing runnable 
CallbackListener{org.apache.beam.runners.jobsubmission.JobInvocation$1@31e727b} 
with executor 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors$ListeningDecorator@29fa1def
java.util.concurrent.RejectedExecutionException: Task 
CallbackListener{org.apache.beam.runners.jobsubmission.JobInvocation$1@31e727b} 
rejected from java.util.concurrent.ThreadPoolExecutor@194d60d4[Shutting down, 
pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
        at 
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
        at 
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
        at 
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors$ListeningDecorator.execute(MoreExecutors.java:537)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1029)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:871)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:716)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.afterRanInterruptibly(TrustedListenableFutureTask.java:133)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:80)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

{noformat}

> org.apache.beam.runners.flink.ReadSourcePortableTest.testExecution[streaming: 
> false] is flaky
> ---------------------------------------------------------------------------------------------
>
>                 Key: BEAM-12291
>                 URL: https://issues.apache.org/jira/browse/BEAM-12291
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Kenneth Knowles
>            Priority: P1
>              Labels: flake, stale-assigned
>          Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Example: https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/17600/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to