[ 
https://issues.apache.org/jira/browse/FLINK-30729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683708#comment-17683708
 ] 

Yanfei Lei commented on FLINK-30729:
------------------------------------

Cause [most 
tests|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala#L241]
 in {{o.a.f.table.planner.runtime.stream.table.AggregateITCase}} use 
`failingDataSource` to test job failover, this checkState condition is 
triggered during the job failover, so it will not cause the test to fail.

 

The following is one of the test logs, we can see that the job has experienced 
failover in this test, maybe we can close this ticket.

 

 
Test 
testPrecisionForSumAggregationOnDecimal[StateBackend=ROCKSDB](org.apache.flink.table.planner.runtime.stream.table.AggregateITCase)
 is running.
--------------------------------------------------------------------------------
01:09:01,401 [ main] INFO 
org.apache.flink.runtime.testutils.PseudoRandomValueSelector [] - Randomly 
selected true for execution.checkpointing.unaligned.enabled
01:09:01,401 [ main] INFO 
org.apache.flink.runtime.testutils.PseudoRandomValueSelector [] - Randomly 
selected PT0S for execution.checkpointing.aligned-checkpoint-timeout
01:09:01,401 [ main] INFO 
org.apache.flink.runtime.testutils.PseudoRandomValueSelector [] - Randomly 
selected true for state.backend.changelog.enabled
01:09:01,401 [ main] INFO 
org.apache.flink.runtime.testutils.PseudoRandomValueSelector [] - Randomly 
selected PT0.1S for state.backend.changelog.periodic-materialize.interval
01:09:01,408 [ main] INFO 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using 
predefined options: DEFAULT.
01:09:01,416 [ main] INFO org.apache.flink.api.java.typeutils.TypeExtractor [] 
- Field Row#fieldByName will be processed as GenericType. Please read the Flink 
documentation on "Data Types & Serialization" for details of the effect on 
performance and schema evolution.
01:09:01,416 [ main] INFO org.apache.flink.api.java.typeutils.TypeExtractor [] 
- class java.util.LinkedHashMap does not contain a getter for field accessOrder
01:09:01,416 [ main] INFO org.apache.flink.api.java.typeutils.TypeExtractor [] 
- class java.util.LinkedHashMap does not contain a setter for field accessOrder
01:09:01,416 [ main] INFO org.apache.flink.api.java.typeutils.TypeExtractor [] 
- Class class java.util.LinkedHashMap cannot be used as a POJO type because not 
all fields are valid POJO fields, and must be processed as GenericType. Please 
read the Flink documentation on "Data Types & Serialization" for details of the 
effect on performance and schema evolution.
01:09:01,416 [ main] INFO org.apache.flink.api.java.typeutils.TypeExtractor [] 
- Field Row#positionByName will be processed as GenericType. Please read the 
Flink documentation on "Data Types & Serialization" for details of the effect 
on performance and schema evolution.
01:09:01,416 [ main] INFO org.apache.flink.api.java.typeutils.TypeExtractor [] 
- class org.apache.flink.types.Row is missing a default constructor so it 
cannot be used as a POJO type and must be processed as GenericType. Please read 
the Flink documentation on "Data Types & Serialization" for details of the 
effect on performance and schema evolution.
01:09:01,501 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received JobGraph 
submission 'Flink Streaming Job' (342047f7eef5e3f1ba2c9dac1ca59a49).
01:09:01,501 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job 
'Flink Streaming Job' (342047f7eef5e3f1ba2c9dac1ca59a49).
01:09:01,501 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService 
[] - Proposing leadership to contender LeaderContender: 
JobMasterServiceLeadershipRunner
01:09:01,501 [jobmanager-io-thread-17] INFO 
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for 
org.apache.flink.runtime.jobmaster.JobMaster at 
akka://flink/user/rpc/jobmanager_715 .
01:09:01,501 [jobmanager-io-thread-17] INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job 'Flink 
Streaming Job' (342047f7eef5e3f1ba2c9dac1ca59a49).
01:09:01,503 [jobmanager-io-thread-17] INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back off time 
strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1, 
backoffTimeMS=0) for Flink Streaming Job (342047f7eef5e3f1ba2c9dac1ca59a49).
01:09:01,503 [jobmanager-io-thread-17] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Created execution 
graph adcc2a646d5d2f67d85c7ff837ecdef9 for job 342047f7eef5e3f1ba2c9dac1ca59a49.
01:09:01,503 [jobmanager-io-thread-17] INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Running initialization on 
master for job Flink Streaming Job (342047f7eef5e3f1ba2c9dac1ca59a49).
01:09:01,503 [jobmanager-io-thread-17] INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran 
initialization on master in 0 ms.
01:09:01,503 [jobmanager-io-thread-17] INFO 
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 
1 new pipelined regions in 0 ms, total 1 pipelined regions currently.
01:09:01,505 [jobmanager-io-thread-17] INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Using job/cluster config to 
configure application-defined state backend: 
RocksDBStateBackend\{checkpointStreamBackend=File State Backend (checkpoints: 
'file:/tmp/junit8120086463983465670/junit6880377978597470118', savepoints: 
'null, fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=true, numberOfTransferThreads=4, 
writeBatchSize=2097152}
01:09:01,505 [jobmanager-io-thread-17] INFO 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using 
predefined options: DEFAULT.
01:09:01,505 [jobmanager-io-thread-17] INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Using application-defined 
state backend: RocksDBStateBackend\{checkpointStreamBackend=File State Backend 
(checkpoints: 'file:/tmp/junit8120086463983465670/junit6880377978597470118', 
savepoints: 'null, fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=true, numberOfTransferThreads=4, 
writeBatchSize=2097152}
01:09:01,505 [jobmanager-io-thread-17] INFO 
org.apache.flink.state.changelog.AbstractChangelogStateBackend [] - 
ChangelogStateBackend is used, delegating RocksDBStateBackend.
01:09:01,505 [jobmanager-io-thread-17] INFO 
org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader 
loads ChangelogStateBackend to delegate RocksDBStateBackend
01:09:01,505 [jobmanager-io-thread-17] INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Using legacy state backend 
RocksDBStateBackend\{checkpointStreamBackend=File State Backend (checkpoints: 
'file:/tmp/junit8120086463983465670/junit6880377978597470118', savepoints: 
'null, fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=true, numberOfTransferThreads=4, 
writeBatchSize=2097152} as Job checkpoint storage
01:09:01,505 [jobmanager-io-thread-17] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint 
found during restore.
01:09:01,506 [jobmanager-io-thread-17] INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Using failover strategy 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@344df908
 for Flink Streaming Job (342047f7eef5e3f1ba2c9dac1ca59a49).
01:09:01,506 [jobmanager-io-thread-17] INFO 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService 
[] - Received confirmation of leadership for leader 
akka://flink/user/rpc/jobmanager_715 , 
session=da0f1504-fdd5-494d-8a64-759db5b33b5e
01:09:01,506 [flink-akka.actor.default-dispatcher-8] INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Starting execution of job 
'Flink Streaming Job' (342047f7eef5e3f1ba2c9dac1ca59a49) under job master id 
8a64759db5b33b5eda0f1504fdd5494d.
01:09:01,506 [flink-akka.actor.default-dispatcher-8] INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Starting scheduling with 
scheduling strategy 
[org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
01:09:01,506 [flink-akka.actor.default-dispatcher-8] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink Streaming 
Job (342047f7eef5e3f1ba2c9dac1ca59a49) switched from state CREATED to RUNNING.
01:09:01,507 [flink-akka.actor.default-dispatcher-8] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
switched from CREATED to SCHEDULED.
01:09:01,507 [flink-akka.actor.default-dispatcher-8] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_0) 
switched from CREATED to SCHEDULED.
01:09:01,507 [flink-akka.actor.default-dispatcher-8] INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Connecting to ResourceManager 
akka://flink/user/rpc/resourcemanager_692(9a64c20cfad53b52a522a2b00a584392)
01:09:01,507 [flink-akka.actor.default-dispatcher-8] INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Resolved ResourceManager 
address, beginning registration
01:09:01,507 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Registering job manager 
8a64759db5b33b5eda0f1504fdd5494d@akka://flink/user/rpc/jobmanager_715 for job 
342047f7eef5e3f1ba2c9dac1ca59a49.
01:09:01,508 [flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Registered job manager 
8a64759db5b33b5eda0f1504fdd5494d@akka://flink/user/rpc/jobmanager_715 for job 
342047f7eef5e3f1ba2c9dac1ca59a49.
01:09:01,508 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - JobManager successfully 
registered at ResourceManager, leader id: 9a64c20cfad53b52a522a2b00a584392.
01:09:01,508 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job 342047f7eef5e3f1ba2c9dac1ca59a49: 
[ResourceRequirement\{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=1}]
01:09:01,563 [ Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to 
trigger checkpoint for job 342047f7eef5e3f1ba2c9dac1ca59a49 since Checkpoint 
triggering task Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1) of job 342047f7eef5e3f1ba2c9dac1ca59a49 is not being executed at the 
moment. Aborting checkpoint. Failure reason: Not all required tasks are 
currently running..
01:09:01,570 [flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive slot request 
3a3d35a0d8345d765a91cd668f0b8bfa for job 342047f7eef5e3f1ba2c9dac1ca59a49 from 
resource manager with leader id 9a64c20cfad53b52a522a2b00a584392.
01:09:01,570 [flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated slot for 
3a3d35a0d8345d765a91cd668f0b8bfa.
01:09:01,570 [flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 
342047f7eef5e3f1ba2c9dac1ca59a49 for job leader monitoring.
01:09:01,570 [mini-cluster-io-thread-3] INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to 
register at job manager akka://flink/user/rpc/jobmanager_715 with leader id 
da0f1504-fdd5-494d-8a64-759db5b33b5e.
01:09:01,570 [flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved 
JobManager address, beginning registration
01:09:01,571 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful 
registration at job manager akka://flink/user/rpc/jobmanager_715 for job 
342047f7eef5e3f1ba2c9dac1ca59a49.
01:09:01,571 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Establish JobManager 
connection for job 342047f7eef5e3f1ba2c9dac1ca59a49.
01:09:01,571 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Offer reserved slots to 
the leader of job 342047f7eef5e3f1ba2c9dac1ca59a49.
01:09:01,571 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
switched from SCHEDULED to DEPLOYING.
01:09:01,571 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
Custom Source -> SourceConversion[2291] -> Calc[2292] (1/1) (attempt #0) with 
attempt id 
adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_0 and 
vertex id cbc357ccb763df2852fee8c4fc7d55f2_0 to 
93621b6a-2d19-4fdd-a0a4-2f2c26ecfe0a @ localhost (dataPort=-1) with allocation 
id 3a3d35a0d8345d765a91cd668f0b8bfa
01:09:01,571 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_0) 
switched from SCHEDULED to DEPLOYING.
01:09:01,571 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1) (attempt 
#0) with attempt id 
adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_0 and 
vertex id c27dcf7b54ef6bfd6cff02ca8870b681_0 to 
93621b6a-2d19-4fdd-a0a4-2f2c26ecfe0a @ localhost (dataPort=-1) with allocation 
id 3a3d35a0d8345d765a91cd668f0b8bfa
01:09:01,572 [flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
3a3d35a0d8345d765a91cd668f0b8bfa.
01:09:01,572 [flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader [] - 
Creating a changelog storage with name 'memory'.
01:09:01,573 [flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: 
Custom Source -> SourceConversion[2291] -> Calc[2292] (1/1)#0 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_0), deploy 
into slot with allocation id 3a3d35a0d8345d765a91cd668f0b8bfa.
01:09:01,573 [flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
3a3d35a0d8345d765a91cd668f0b8bfa.
01:09:01,573 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1)#0 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
switched from CREATED to DEPLOYING.
01:09:01,573 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files 
for task Source: Custom Source -> SourceConversion[2291] -> Calc[2292] (1/1)#0 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
[DEPLOYING].
01:09:01,574 [flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#0 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_0), deploy 
into slot with allocation id 3a3d35a0d8345d765a91cd668f0b8bfa.
01:09:01,574 [flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
3a3d35a0d8345d765a91cd668f0b8bfa.
01:09:01,575 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task [] - 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#0 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_0) 
switched from CREATED to DEPLOYING.
01:09:01,575 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files 
for task GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#0 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_0) 
[DEPLOYING].
01:09:01,576 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#0] INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using 
job/cluster config to configure application-defined state backend: 
RocksDBStateBackend\{checkpointStreamBackend=File State Backend (checkpoints: 
'file:/tmp/junit8120086463983465670/junit6880377978597470118', savepoints: 
'null, fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=true, numberOfTransferThreads=4, 
writeBatchSize=2097152}
01:09:01,576 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using 
job/cluster config to configure application-defined state backend: 
RocksDBStateBackend\{checkpointStreamBackend=File State Backend (checkpoints: 
'file:/tmp/junit8120086463983465670/junit6880377978597470118', savepoints: 
'null, fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=true, numberOfTransferThreads=4, 
writeBatchSize=2097152}
01:09:01,576 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#0] INFO 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using 
predefined options: DEFAULT.
01:09:01,576 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using 
predefined options: DEFAULT.
01:09:01,576 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#0] INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using 
application-defined state backend: 
RocksDBStateBackend\{checkpointStreamBackend=File State Backend (checkpoints: 
'file:/tmp/junit8120086463983465670/junit6880377978597470118', savepoints: 
'null, fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=true, numberOfTransferThreads=4, 
writeBatchSize=2097152}
01:09:01,576 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using 
application-defined state backend: 
RocksDBStateBackend\{checkpointStreamBackend=File State Backend (checkpoints: 
'file:/tmp/junit8120086463983465670/junit6880377978597470118', savepoints: 
'null, fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=true, numberOfTransferThreads=4, 
writeBatchSize=2097152}
01:09:01,576 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#0] INFO org.apache.flink.state.changelog.AbstractChangelogStateBackend [] 
- ChangelogStateBackend is used, delegating RocksDBStateBackend.
01:09:01,576 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.state.changelog.AbstractChangelogStateBackend [] 
- ChangelogStateBackend is used, delegating RocksDBStateBackend.
01:09:01,576 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.runtime.state.StateBackendLoader [] - State 
backend loader loads ChangelogStateBackend to delegate RocksDBStateBackend
01:09:01,576 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#0] INFO org.apache.flink.runtime.state.StateBackendLoader [] - State 
backend loader loads ChangelogStateBackend to delegate RocksDBStateBackend
01:09:01,576 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using 
legacy state backend RocksDBStateBackend\{checkpointStreamBackend=File State 
Backend (checkpoints: 
'file:/tmp/junit8120086463983465670/junit6880377978597470118', savepoints: 
'null, fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=true, numberOfTransferThreads=4, 
writeBatchSize=2097152} as Job checkpoint storage
01:09:01,576 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#0] INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using 
legacy state backend RocksDBStateBackend\{checkpointStreamBackend=File State 
Backend (checkpoints: 
'file:/tmp/junit8120086463983465670/junit6880377978597470118', savepoints: 
'null, fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=true, numberOfTransferThreads=4, 
writeBatchSize=2097152} as Job checkpoint storage
01:09:01,576 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1)#0 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
switched from DEPLOYING to INITIALIZING.
01:09:01,576 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task [] - 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#0 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_0) 
switched from DEPLOYING to INITIALIZING.
01:09:01,576 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
switched from DEPLOYING to INITIALIZING.
01:09:01,576 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_0) 
switched from DEPLOYING to INITIALIZING.
01:09:01,598 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
Getting shared memory for RocksDB: shareScope=SLOT, managed=false
01:09:01,598 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
Obtained shared RocksDB cache of size 20971520 bytes
01:09:01,604 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1)#0 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
switched from INITIALIZING to RUNNING.
01:09:01,604 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
switched from INITIALIZING to RUNNING.
01:09:01,614 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
Finished building RocksDB keyed state-backend at 
/tmp/junit7725981464549332018/junit7198969602070316517/minicluster_731556ba7d1abf61a0c79e8d9425fd67/tm_0/tmp/job_342047f7eef5e3f1ba2c9dac1ca59a49_op_KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681__1_1__uuid_5a1c245f-733b-408e-9a19-05bfef176535.
01:09:01,614 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.state.common.PeriodicMaterializationManager [] - 
Task GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#0 
starts periodic materialization
01:09:01,614 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.state.common.PeriodicMaterializationManager [] - 
Task GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#0 
schedules the next materialization in 0 seconds
01:09:01,635 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] WARN org.apache.flink.contrib.streaming.state.RocksDBOperationUtils [] 
- RocksDBStateBackend performance will be poor because of the current Flink 
memory configuration! RocksDB will flush memtable constantly, causing high IO 
and CPU. Typically the easiest fix is to increase task manager managed memory 
size. If running locally, see the parameter taskmanager.memory.managed.size. 
Details: arenaBlockSize 8388608 > mutableLimit 6116692 (writeBufferSize = 
67108864, arenaBlockSizeConfigured = 0, defaultArenaBlockSize = 8388608, 
writeBufferManagerCapacity = 6990506)
01:09:01,643 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] WARN org.apache.flink.contrib.streaming.state.RocksDBOperationUtils [] 
- RocksDBStateBackend performance will be poor because of the current Flink 
memory configuration! RocksDB will flush memtable constantly, causing high IO 
and CPU. Typically the easiest fix is to increase task manager managed memory 
size. If running locally, see the parameter taskmanager.memory.managed.size. 
Details: arenaBlockSize 8388608 > mutableLimit 6116692 (writeBufferSize = 
67108864, arenaBlockSizeConfigured = 0, defaultArenaBlockSize = 8388608, 
writeBufferManagerCapacity = 6990506)
01:09:01,652 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] WARN org.apache.flink.contrib.streaming.state.RocksDBOperationUtils [] 
- RocksDBStateBackend performance will be poor because of the current Flink 
memory configuration! RocksDB will flush memtable constantly, causing high IO 
and CPU. Typically the easiest fix is to increase task manager managed memory 
size. If running locally, see the parameter taskmanager.memory.managed.size. 
Details: arenaBlockSize 8388608 > mutableLimit 6116692 (writeBufferSize = 
67108864, arenaBlockSizeConfigured = 0, defaultArenaBlockSize = 8388608, 
writeBufferManagerCapacity = 6990506)
01:09:01,663 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.state.changelog.ChangelogKeyedStateBackend [] - 
Initialize Materialization. Current changelog writers last append to sequence 
number 0
01:09:01,663 [ Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to 
trigger checkpoint for job 342047f7eef5e3f1ba2c9dac1ca59a49 since Checkpoint 
triggering task GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1) of job 342047f7eef5e3f1ba2c9dac1ca59a49 is not being executed at the 
moment. Aborting checkpoint. Failure reason: Not all required tasks are 
currently running..
01:09:01,663 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.state.common.PeriodicMaterializationManager [] - 
Task GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#0 
schedules the next materialization in 0 seconds
01:09:01,663 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.state.common.PeriodicMaterializationManager [] - 
Task GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#0 has 
no state updates since last materialization, skip this one and schedule the 
next one in 0 seconds
01:09:01,663 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task [] - 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#0 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_0) 
switched from INITIALIZING to RUNNING.
01:09:01,664 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_0) 
switched from INITIALIZING to RUNNING.
01:09:01,763 [ Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
checkpoint 1 (type=CheckpointType\{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1674004141762 for job 
342047f7eef5e3f1ba2c9dac1ca59a49.
01:09:01,763 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.state.changelog.ChangelogKeyedStateBackend [] - 
Initialize Materialization. Current changelog writers last append to sequence 
number 2
01:09:01,763 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.state.changelog.ChangelogKeyedStateBackend [] - 
Starting materialization from 0 : 2
01:09:01,775 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.state.changelog.ChangelogKeyedStateBackend [] - 
snapshot of GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0 for checkpoint 1, change range: 0..2, materialization ID 0
01:09:01,777 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.state.changelog.ChangelogKeyedStateBackend [] - 
Task GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#0 
finishes materialization, updates the snapshotState upTo 2 : 
org.apache.flink.runtime.state.SnapshotResult@c25958f
01:09:01,777 [AsyncOperations-thread-1] INFO 
org.apache.flink.state.common.PeriodicMaterializationManager [] - Task 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#0 schedules 
the next materialization in 0 seconds
01:09:01,779 [jobmanager-io-thread-28] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
checkpoint 1 for job 342047f7eef5e3f1ba2c9dac1ca59a49 (4483 bytes, 
checkpointDuration=17 ms, finalizationTime=0 ms).
01:09:01,781 [Channel state writer Source: Custom Source -> 
SourceConversion[2291] -> Calc[2292] (1/1)#0] INFO 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
 [] - Source: Custom Source -> SourceConversion[2291] -> Calc[2292] (1/1)#0 
discarding 0 drained requests
01:09:01,782 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#0] WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1)#0 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
switched from RUNNING to FAILED with failure cause:
java.lang.Exception: Artificial Failure
at 
org.apache.flink.table.planner.runtime.utils.FailingCollectionSource.run(FailingCollectionSource.java:172)
 ~[test-classes/:?]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-streaming-java-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) 
~[flink-streaming-java-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
 ~[flink-streaming-java-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
01:09:01,782 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task 
resources for Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#0 (adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_0).
01:09:01,782 [flink-akka.actor.default-dispatcher-7] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and 
sending final execution state FAILED to JobManager for task Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1)#0 
adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
01:09:01,783 [flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
switched from RUNNING to FAILED on 93621b6a-2d19-4fdd-a0a4-2f2c26ecfe0a @ 
localhost (dataPort=-1).
java.lang.Exception: Artificial Failure
at 
org.apache.flink.table.planner.runtime.utils.FailingCollectionSource.run(FailingCollectionSource.java:172)
 ~[test-classes/:?]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-streaming-java-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) 
~[flink-streaming-java-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
 ~[flink-streaming-java-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
01:09:01,784 [flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - 2 tasks will be restarted to 
recover the failed task 
adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
01:09:01,784 [flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink Streaming 
Job (342047f7eef5e3f1ba2c9dac1ca59a49) switched from state RUNNING to 
RESTARTING.
01:09:01,784 [flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_0) 
switched from RUNNING to CANCELING.
01:09:01,784 [flink-akka.actor.default-dispatcher-7] INFO 
org.apache.flink.runtime.taskmanager.Task [] - Attempting to cancel task 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#0 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_0).
01:09:01,784 [flink-akka.actor.default-dispatcher-7] INFO 
org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate[2294] -> 
SinkConversion[2295] -> Sink: Unnamed (1/1)#0 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_0) 
switched from RUNNING to CANCELING.
01:09:01,784 [flink-akka.actor.default-dispatcher-7] INFO 
org.apache.flink.runtime.taskmanager.Task [] - Triggering cancellation of task 
code GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#0 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_0).
01:09:01,785 [Channel state writer GroupAggregate[2294] -> SinkConversion[2295] 
-> Sink: Unnamed (1/1)#0] INFO 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
 [] - GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#0 
discarding 0 drained requests
01:09:01,785 [Canceler for GroupAggregate[2294] -> SinkConversion[2295] -> 
Sink: Unnamed (1/1)#0 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_0).] INFO 
org.apache.flink.state.common.PeriodicMaterializationManager [] - Shutting down 
PeriodicMaterializationManager.
01:09:01,787 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend 
[] - Closed RocksDB State Backend. Cleaning up RocksDB working directory 
/tmp/junit7725981464549332018/junit7198969602070316517/minicluster_731556ba7d1abf61a0c79e8d9425fd67/tm_0/tmp/job_342047f7eef5e3f1ba2c9dac1ca59a49_op_KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681__1_1__uuid_5a1c245f-733b-408e-9a19-05bfef176535.
01:09:01,791 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task [] - 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#0 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_0) 
switched from CANCELING to CANCELED.
01:09:01,791 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task 
resources for GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#0 (adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_0).
01:09:01,791 [flink-akka.actor.default-dispatcher-7] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and 
sending final execution state CANCELED to JobManager for task 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#0 
adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_0.
01:09:01,791 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_0) 
switched from CANCELING to CANCELED.
01:09:01,792 [flink-akka.actor.default-dispatcher-7] INFO 
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Clearing resource requirements of job 342047f7eef5e3f1ba2c9dac1ca59a49
01:09:01,792 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink Streaming 
Job (342047f7eef5e3f1ba2c9dac1ca59a49) switched from state RESTARTING to 
RUNNING.
01:09:01,792 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 
342047f7eef5e3f1ba2c9dac1ca59a49 from Checkpoint 1 @ 1674004141762 for 
342047f7eef5e3f1ba2c9dac1ca59a49 located at 
file:/tmp/junit8120086463983465670/junit6880377978597470118/342047f7eef5e3f1ba2c9dac1ca59a49/chk-1.
01:09:01,793 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state 
to restore
01:09:01,793 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_1) 
switched from CREATED to SCHEDULED.
01:09:01,793 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_1) 
switched from CREATED to SCHEDULED.
01:09:01,793 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_1) 
switched from SCHEDULED to DEPLOYING.
01:09:01,793 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: 
Custom Source -> SourceConversion[2291] -> Calc[2292] (1/1) (attempt #1) with 
attempt id 
adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_1 and 
vertex id cbc357ccb763df2852fee8c4fc7d55f2_0 to 
93621b6a-2d19-4fdd-a0a4-2f2c26ecfe0a @ localhost (dataPort=-1) with allocation 
id 3a3d35a0d8345d765a91cd668f0b8bfa
01:09:01,793 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_1) 
switched from SCHEDULED to DEPLOYING.
01:09:01,793 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1) (attempt 
#1) with attempt id 
adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_1 and 
vertex id c27dcf7b54ef6bfd6cff02ca8870b681_0 to 
93621b6a-2d19-4fdd-a0a4-2f2c26ecfe0a @ localhost (dataPort=-1) with allocation 
id 3a3d35a0d8345d765a91cd668f0b8bfa
01:09:01,793 [flink-akka.actor.default-dispatcher-8] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
3a3d35a0d8345d765a91cd668f0b8bfa.
01:09:01,793 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job 342047f7eef5e3f1ba2c9dac1ca59a49: 
[ResourceRequirement\{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=1}]
01:09:01,794 [flink-akka.actor.default-dispatcher-8] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: 
Custom Source -> SourceConversion[2291] -> Calc[2292] (1/1)#1 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_1), deploy 
into slot with allocation id 3a3d35a0d8345d765a91cd668f0b8bfa.
01:09:01,795 [flink-akka.actor.default-dispatcher-8] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
3a3d35a0d8345d765a91cd668f0b8bfa.
01:09:01,795 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1)#1 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_1) 
switched from CREATED to DEPLOYING.
01:09:01,795 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files 
for task Source: Custom Source -> SourceConversion[2291] -> Calc[2292] (1/1)#1 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_1) 
[DEPLOYING].
01:09:01,796 [flink-akka.actor.default-dispatcher-8] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#1 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_1), deploy 
into slot with allocation id 3a3d35a0d8345d765a91cd668f0b8bfa.
01:09:01,796 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task [] - 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#1 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_1) 
switched from CREATED to DEPLOYING.
01:09:01,796 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files 
for task GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#1 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_1) 
[DEPLOYING].
01:09:01,796 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#1] INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using 
job/cluster config to configure application-defined state backend: 
RocksDBStateBackend\{checkpointStreamBackend=File State Backend (checkpoints: 
'file:/tmp/junit8120086463983465670/junit6880377978597470118', savepoints: 
'null, fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=true, numberOfTransferThreads=4, 
writeBatchSize=2097152}
01:09:01,796 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#1] INFO 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using 
predefined options: DEFAULT.
01:09:01,796 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#1] INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using 
application-defined state backend: 
RocksDBStateBackend\{checkpointStreamBackend=File State Backend (checkpoints: 
'file:/tmp/junit8120086463983465670/junit6880377978597470118', savepoints: 
'null, fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=true, numberOfTransferThreads=4, 
writeBatchSize=2097152}
01:09:01,797 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#1] INFO org.apache.flink.state.changelog.AbstractChangelogStateBackend [] 
- ChangelogStateBackend is used, delegating RocksDBStateBackend.
01:09:01,797 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#1] INFO org.apache.flink.runtime.state.StateBackendLoader [] - State 
backend loader loads ChangelogStateBackend to delegate RocksDBStateBackend
01:09:01,797 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#1] INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using 
legacy state backend RocksDBStateBackend\{checkpointStreamBackend=File State 
Backend (checkpoints: 
'file:/tmp/junit8120086463983465670/junit6880377978597470118', savepoints: 
'null, fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=true, numberOfTransferThreads=4, 
writeBatchSize=2097152} as Job checkpoint storage
01:09:01,797 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using 
job/cluster config to configure application-defined state backend: 
RocksDBStateBackend\{checkpointStreamBackend=File State Backend (checkpoints: 
'file:/tmp/junit8120086463983465670/junit6880377978597470118', savepoints: 
'null, fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=true, numberOfTransferThreads=4, 
writeBatchSize=2097152}
01:09:01,797 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1)#1 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_1) 
switched from DEPLOYING to INITIALIZING.
01:09:01,797 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using 
predefined options: DEFAULT.
01:09:01,797 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using 
application-defined state backend: 
RocksDBStateBackend\{checkpointStreamBackend=File State Backend (checkpoints: 
'file:/tmp/junit8120086463983465670/junit6880377978597470118', savepoints: 
'null, fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=true, numberOfTransferThreads=4, 
writeBatchSize=2097152}
01:09:01,797 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO org.apache.flink.state.changelog.AbstractChangelogStateBackend [] 
- ChangelogStateBackend is used, delegating RocksDBStateBackend.
01:09:01,797 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO org.apache.flink.runtime.state.StateBackendLoader [] - State 
backend loader loads ChangelogStateBackend to delegate RocksDBStateBackend
01:09:01,797 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using 
legacy state backend RocksDBStateBackend\{checkpointStreamBackend=File State 
Backend (checkpoints: 
'file:/tmp/junit8120086463983465670/junit6880377978597470118', savepoints: 
'null, fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=true, numberOfTransferThreads=4, 
writeBatchSize=2097152} as Job checkpoint storage
01:09:01,797 [flink-akka.actor.default-dispatcher-8] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_1) 
switched from DEPLOYING to INITIALIZING.
01:09:01,797 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task [] - 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#1 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_1) 
switched from DEPLOYING to INITIALIZING.
01:09:01,801 [flink-akka.actor.default-dispatcher-7] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_1) 
switched from DEPLOYING to INITIALIZING.
01:09:01,801 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1)#1 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_1) 
switched from INITIALIZING to RUNNING.
01:09:01,801 [ Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to 
trigger checkpoint for job 342047f7eef5e3f1ba2c9dac1ca59a49 since Checkpoint 
triggering task Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1) of job 342047f7eef5e3f1ba2c9dac1ca59a49 is not being executed at the 
moment. Aborting checkpoint. Failure reason: Not all required tasks are 
currently running..
01:09:01,801 [flink-akka.actor.default-dispatcher-7] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_1) 
switched from INITIALIZING to RUNNING.
01:09:01,802 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
Getting shared memory for RocksDB: shareScope=SLOT, managed=false
01:09:01,802 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
Obtained shared RocksDB cache of size 20971520 bytes
01:09:01,824 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
Finished building RocksDB keyed state-backend at 
/tmp/junit7725981464549332018/junit7198969602070316517/minicluster_731556ba7d1abf61a0c79e8d9425fd67/tm_0/tmp/job_342047f7eef5e3f1ba2c9dac1ca59a49_op_KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681__1_1__uuid_00457371-7476-4757-9546-8bc8e92a1cec.
01:09:01,824 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO 
org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader [] - 
Creating a changelog storage with name 'memory' to restore from 
'InMemoryChangelogStateHandle'.
01:09:01,826 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] WARN org.apache.flink.contrib.streaming.state.RocksDBOperationUtils [] 
- RocksDBStateBackend performance will be poor because of the current Flink 
memory configuration! RocksDB will flush memtable constantly, causing high IO 
and CPU. Typically the easiest fix is to increase task manager managed memory 
size. If running locally, see the parameter taskmanager.memory.managed.size. 
Details: arenaBlockSize 8388608 > mutableLimit 6116692 (writeBufferSize = 
67108864, arenaBlockSizeConfigured = 0, defaultArenaBlockSize = 8388608, 
writeBufferManagerCapacity = 6990506)
01:09:01,832 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO org.apache.flink.state.common.PeriodicMaterializationManager [] - 
Task GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#1 
starts periodic materialization
01:09:01,832 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO org.apache.flink.state.common.PeriodicMaterializationManager [] - 
Task GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#1 
schedules the next materialization in 0 seconds
01:09:01,833 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] WARN org.apache.flink.contrib.streaming.state.RocksDBOperationUtils [] 
- RocksDBStateBackend performance will be poor because of the current Flink 
memory configuration! RocksDB will flush memtable constantly, causing high IO 
and CPU. Typically the easiest fix is to increase task manager managed memory 
size. If running locally, see the parameter taskmanager.memory.managed.size. 
Details: arenaBlockSize 8388608 > mutableLimit 6116692 (writeBufferSize = 
67108864, arenaBlockSizeConfigured = 0, defaultArenaBlockSize = 8388608, 
writeBufferManagerCapacity = 6990506)
01:09:01,844 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] WARN org.apache.flink.contrib.streaming.state.RocksDBOperationUtils [] 
- RocksDBStateBackend performance will be poor because of the current Flink 
memory configuration! RocksDB will flush memtable constantly, causing high IO 
and CPU. Typically the easiest fix is to increase task manager managed memory 
size. If running locally, see the parameter taskmanager.memory.managed.size. 
Details: arenaBlockSize 8388608 > mutableLimit 6116692 (writeBufferSize = 
67108864, arenaBlockSizeConfigured = 0, defaultArenaBlockSize = 8388608, 
writeBufferManagerCapacity = 6990506)
01:09:01,851 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task [] - 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#1 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_1) 
switched from INITIALIZING to RUNNING.
01:09:01,852 [flink-akka.actor.default-dispatcher-7] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_1) 
switched from INITIALIZING to RUNNING.
01:09:01,858 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO org.apache.flink.state.changelog.ChangelogKeyedStateBackend [] - 
Initialize Materialization. Current changelog writers last append to sequence 
number 2
01:09:01,858 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO org.apache.flink.state.changelog.ChangelogKeyedStateBackend [] - 
Starting materialization from 0 : 2
01:09:01,873 [AsyncOperations-thread-1] INFO 
org.apache.flink.state.common.PeriodicMaterializationManager [] - Task 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#1 schedules 
the next materialization in 0 seconds
01:09:01,873 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO org.apache.flink.state.changelog.ChangelogKeyedStateBackend [] - 
Task GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#1 
finishes materialization, updates the snapshotState upTo 2 : 
org.apache.flink.runtime.state.SnapshotResult@35e4d010
01:09:01,901 [ Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
checkpoint 2 (type=CheckpointType\{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1674004141900 for job 
342047f7eef5e3f1ba2c9dac1ca59a49.
01:09:01,902 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO org.apache.flink.state.changelog.ChangelogKeyedStateBackend [] - 
snapshot of GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1 for checkpoint 2, change range: 2..2, materialization ID 1
01:09:01,906 [jobmanager-io-thread-30] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
checkpoint 2 for job 342047f7eef5e3f1ba2c9dac1ca59a49 (22764 bytes, 
checkpointDuration=6 ms, finalizationTime=0 ms).
01:09:01,907 [Channel state writer Source: Custom Source -> 
SourceConversion[2291] -> Calc[2292] (1/1)#1] INFO 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
 [] - Source: Custom Source -> SourceConversion[2291] -> Calc[2292] (1/1)#1 
discarding 0 drained requests
01:09:01,907 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1)#1 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_1) 
switched from RUNNING to FINISHED.
01:09:01,907 [Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task 
resources for Source: Custom Source -> SourceConversion[2291] -> Calc[2292] 
(1/1)#1 (adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_1).
01:09:01,908 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and 
sending final execution state FINISHED to JobManager for task Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1)#1 
adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_1.
01:09:01,908 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom 
Source -> SourceConversion[2291] -> Calc[2292] (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_cbc357ccb763df2852fee8c4fc7d55f2_0_1) 
switched from RUNNING to FINISHED.
01:09:01,909 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend 
[] - Closed RocksDB State Backend. Cleaning up RocksDB working directory 
/tmp/junit7725981464549332018/junit7198969602070316517/minicluster_731556ba7d1abf61a0c79e8d9425fd67/tm_0/tmp/job_342047f7eef5e3f1ba2c9dac1ca59a49_op_KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681__1_1__uuid_00457371-7476-4757-9546-8bc8e92a1cec.
01:09:01,910 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO org.apache.flink.state.common.PeriodicMaterializationManager [] - 
Shutting down PeriodicMaterializationManager.
01:09:01,911 [Channel state writer GroupAggregate[2294] -> SinkConversion[2295] 
-> Sink: Unnamed (1/1)#1] INFO 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
 [] - GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#1 
discarding 0 drained requests
01:09:01,911 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task [] - 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#1 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_1) 
switched from RUNNING to FINISHED.
01:09:01,911 [GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task 
resources for GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed 
(1/1)#1 (adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_1).
01:09:01,911 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and 
sending final execution state FINISHED to JobManager for task 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1)#1 
adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_1.
01:09:01,911 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - 
GroupAggregate[2294] -> SinkConversion[2295] -> Sink: Unnamed (1/1) 
(adcc2a646d5d2f67d85c7ff837ecdef9_c27dcf7b54ef6bfd6cff02ca8870b681_0_1) 
switched from RUNNING to FINISHED.
01:09:01,912 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink Streaming 
Job (342047f7eef5e3f1ba2c9dac1ca59a49) switched from state RUNNING to FINISHED.
01:09:01,912 [flink-akka.actor.default-dispatcher-7] INFO 
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Clearing resource requirements of job 342047f7eef5e3f1ba2c9dac1ca59a49
01:09:01,912 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping 
checkpoint coordinator for job 342047f7eef5e3f1ba2c9dac1ca59a49.
01:09:01,913 [flink-akka.actor.default-dispatcher-9] INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
342047f7eef5e3f1ba2c9dac1ca59a49 reached terminal state FINISHED.
01:09:01,913 [mini-cluster-io-thread-2] INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
342047f7eef5e3f1ba2c9dac1ca59a49 has been registered for cleanup in the 
JobResultStore after reaching a terminal state.
01:09:01,913 [flink-akka.actor.default-dispatcher-8] INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the JobMaster for 
job 'Flink Streaming Job' (342047f7eef5e3f1ba2c9dac1ca59a49).
01:09:01,913 [flink-akka.actor.default-dispatcher-8] INFO 
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - 
Shutting down
01:09:01,914 [flink-akka.actor.default-dispatcher-8] INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Disconnect TaskExecutor 
93621b6a-2d19-4fdd-a0a4-2f2c26ecfe0a because: Stopping JobMaster for job 'Flink 
Streaming Job' (342047f7eef5e3f1ba2c9dac1ca59a49).
01:09:01,914 [flink-akka.actor.default-dispatcher-8] INFO 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
Releasing slot [3a3d35a0d8345d765a91cd668f0b8bfa].
01:09:01,914 [flink-akka.actor.default-dispatcher-8] INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Close ResourceManager 
connection 9671a7bdc2b6213f2d9c99d1018dbb85: Stopping JobMaster for job 'Flink 
Streaming Job' (342047f7eef5e3f1ba2c9dac1ca59a49).
01:09:01,914 [flink-akka.actor.default-dispatcher-7] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
TaskSlot(index:2, state:ACTIVE, resource profile: 
ResourceProfile\{taskHeapMemory=256.000gb (274877906944 bytes), 
taskOffHeapMemory=256.000gb (274877906944 bytes), managedMemory=20.000mb 
(20971520 bytes), networkMemory=16.000mb (16777216 bytes)}, allocationId: 
3a3d35a0d8345d765a91cd668f0b8bfa, jobId: 342047f7eef5e3f1ba2c9dac1ca59a49).
01:09:01,914 [flink-akka.actor.default-dispatcher-5] INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Disconnect job manager 
8a64759db5b33b5eda0f1504fdd5494d@akka://flink/user/rpc/jobmanager_715 for job 
342047f7eef5e3f1ba2c9dac1ca59a49 from the resource manager.
01:09:01,914 [flink-akka.actor.default-dispatcher-7] INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 
342047f7eef5e3f1ba2c9dac1ca59a49 from job leader monitoring.
01:09:01,914 [flink-akka.actor.default-dispatcher-7] INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager 
connection for job 342047f7eef5e3f1ba2c9dac1ca59a49.
01:09:01,915 [ main] INFO 
org.apache.flink.table.planner.runtime.stream.table.AggregateITCase [] -
--------------------------------------------------------------------------------
Test 
testPrecisionForSumAggregationOnDecimal[StateBackend=ROCKSDB](org.apache.flink.table.planner.runtime.stream.table.AggregateITCase)
 successfully run.
 

 

 

 

> AggregateITCase generates stacktraces of IllegalStateExceptions in Maven 
> output but doesn't fail
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30729
>                 URL: https://issues.apache.org/jira/browse/FLINK-30729
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends, Table SQL / API
>    Affects Versions: 1.17.0
>            Reporter: Matthias Pohl
>            Priority: Critical
>
> We're seeing some weird IllegalStateException stacktraces in CI for 
> {{o.a.f.table.planner.runtime.stream.table.AggregateITCase}} which, 
> interestingly, doesn't cause the test to fail. That's something we should 
> investigate, I guess:
> {code}
> java.lang.IllegalStateException: LogWriter is closed
>       at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
>       at 
> org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogWriter.appendMeta(InMemoryStateChangelogWriter.java:64)
>       at 
> org.apache.flink.state.changelog.AbstractStateChangeLogger.logMetaIfNeeded(AbstractStateChangeLogger.java:156)
>       at 
> org.apache.flink.state.changelog.AbstractStateChangeLogger.log(AbstractStateChangeLogger.java:140)
>       at 
> org.apache.flink.state.changelog.AbstractStateChangeLogger.valueCleared(AbstractStateChangeLogger.java:104)
>       at 
> org.apache.flink.state.changelog.ChangelogListState.clear(ChangelogListState.java:113)
>       at 
> org.apache.flink.runtime.state.ttl.TtlListState$IteratorWithCleanup.cleanupIfEmpty(TtlListState.java:193)
>       at 
> org.apache.flink.runtime.state.ttl.TtlListState$IteratorWithCleanup.hasNext(TtlListState.java:186)
>       at 
> org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions$DataViewTestAgg.getValue(JavaUserDefinedAggFunctions.java:355)
>       at GroupAggsHandler$10675.getValue(Unknown Source)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:146)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
>       at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
>       at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:236)
>       at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>       at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>       at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:546)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:835)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:784)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:945)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:924)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:738)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:556)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44987&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4&l=13847



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to