[ 
https://issues.apache.org/jira/browse/FLINK-22416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17367253#comment-17367253
 ] 

Stephan Ewen commented on FLINK-22416:
--------------------------------------

Here is a thread dump (excluding the auxiliary threads, like waiting I/O pool 
threads, blob server, REST handler, etc.).

One confusing thing is to see unspilling threads from the channel state.

{code}
"collect-sink-operator-coordinator-executor-thread-pool-thread-1" #1436 daemon 
prio=5 os_prio=0 tid=0x00007fda30073000 nid=0x4683 waiting on condition 
[0x00007fd7da6e5000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f865ca78> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

"channel-state-unspilling-thread-1" #1435 daemon prio=5 os_prio=0 
tid=0x00007fd878002800 nid=0x4682 waiting on condition [0x00007fd7dbdfc000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8b012c0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

"Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, 
default_database, upsert_kafka]], fields=[k_user_id, name, k_event_id, user_id, 
payload, timestamp]) (4/4)#0" #1434 daemon prio=5 os_prio=0 
tid=0x00007fd8c0003000 nid=0x4681 waiting on condition [0x00007fd8187c6000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8c34308> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue.getBatchBlocking(ClosableBlockingQueue.java:396)
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:240)

"Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, 
default_database, upsert_kafka]], fields=[k_user_id, name, k_event_id, user_id, 
payload, timestamp]) (4/4)#0" #1410 prio=5 os_prio=0 tid=0x00007fd84c002000 
nid=0x4680 in Object.wait() [0x00007fd8188c7000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.Object.wait(Object.java:502)
        at 
org.apache.flink.streaming.connectors.kafka.internals.Handover.pollNext(Handover.java:73)
        - locked <0x00000000f8c498c8> (a java.lang.Object)
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:133)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:62)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

"channel-state-unspilling-thread-1" #1431 daemon prio=5 os_prio=0 
tid=0x00007fd84c004800 nid=0x467f waiting on condition [0x00007fd7886c7000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8c905e0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

"Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, 
default_database, upsert_kafka]], fields=[k_user_id, name, k_event_id, user_id, 
payload, timestamp]) (2/4)#0" #1433 daemon prio=5 os_prio=0 
tid=0x00007fd8a4001000 nid=0x467e waiting on condition [0x00007fd78aceb000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8ceac68> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue.getBatchBlocking(ClosableBlockingQueue.java:396)
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:240)

"channel-state-unspilling-thread-1" #1432 daemon prio=5 os_prio=0 
tid=0x00007fd874003000 nid=0x467d waiting on condition [0x00007fd7db5f4000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8a78200> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

"Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, 
default_database, upsert_kafka]], fields=[k_user_id, name, k_event_id, user_id, 
payload, timestamp]) (3/4)#0" #1430 daemon prio=5 os_prio=0 
tid=0x00007fd898001800 nid=0x467c runnable [0x00007fd7893d4000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <0x00000000f88f9b40> (a sun.nio.ch.Util$3)
        - locked <0x00000000f88f9b30> (a java.util.Collections$UnmodifiableSet)
        - locked <0x00000000f88f7308> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.kafka.common.network.Selector.select(Selector.java:794)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:258)

"channel-state-unspilling-thread-1" #1429 daemon prio=5 os_prio=0 
tid=0x00007fd888002800 nid=0x467b waiting on condition [0x00007fd78a6e5000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8c3a188> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

"channel-state-unspilling-thread-1" #1428 daemon prio=5 os_prio=0 
tid=0x00007fd884001000 nid=0x467a waiting on condition [0x00007fd7d8dce000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8cfd688> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

"Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, 
default_database, upsert_kafka]], fields=[k_user_id, name, k_event_id, user_id, 
payload, timestamp]) (2/4)#0" #1414 prio=5 os_prio=0 tid=0x00007fd844002000 
nid=0x4678 in Object.wait() [0x00007fd7898d9000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.Object.wait(Object.java:502)
        at 
org.apache.flink.streaming.connectors.kafka.internals.Handover.pollNext(Handover.java:73)
        - locked <0x00000000f8cc1da8> (a java.lang.Object)
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:133)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:62)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

"channel-state-unspilling-thread-1" #1426 daemon prio=5 os_prio=0 
tid=0x00007fd844001000 nid=0x4677 waiting on condition [0x00007fd7885c6000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8cfdc08> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

"Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, 
default_database, upsert_kafka]], fields=[k_user_id, name, k_event_id, user_id, 
payload, timestamp]) (3/4)#0" #1412 prio=5 os_prio=0 tid=0x00007fd850004800 
nid=0x4676 in Object.wait() [0x00007fd7894d5000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.Object.wait(Object.java:502)
        at 
org.apache.flink.streaming.connectors.kafka.internals.Handover.pollNext(Handover.java:73)
        - locked <0x00000000f8d17488> (a java.lang.Object)
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:133)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:62)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

"channel-state-unspilling-thread-1" #1425 daemon prio=5 os_prio=0 
tid=0x00007fd850003800 nid=0x4675 waiting on condition [0x00007fd78a7e6000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8d36660> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

"Thread-93" #1424 prio=5 os_prio=0 tid=0x00007fd884003800 nid=0x4674 runnable 
[0x00007fd7d99d8000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:171)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at java.net.SocketInputStream.read(SocketInputStream.java:224)
        at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
        at org.apache.flink.types.StringValue.readString(StringValue.java:753)
        at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
        at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
        at 
org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest.<init>(CollectCoordinationRequest.java:52)
        at 
org.apache.flink.streaming.api.operators.collect.CollectSinkFunction$ServerThread.run(CollectSinkFunction.java:391)

"channel-state-unspilling-thread-1" #1423 daemon prio=5 os_prio=0 
tid=0x00007fd858001000 nid=0x4673 waiting on condition [0x00007fd78a5e4000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8b79648> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

"Sink: Collect table sink (1/1)#0" #1422 prio=5 os_prio=0 
tid=0x00007fda48099000 nid=0x4672 waiting on condition [0x00007fd789ddc000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f898d8c0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:341)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:685)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:640)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$1699/155725233.run(Unknown
 Source)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:651)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:624)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
        at java.lang.Thread.run(Thread.java:748)

"ChangelogNormalize(key=[k_event_id, k_user_id]) -> Calc(select=[k_user_id, 
name, CAST(timestamp) AS timestamp, k_event_id, user_id, payload]) -> 
NotNullEnforcer(fields=[k_user_id, k_event_id]) (1/4)#0" #1421 prio=5 os_prio=0 
tid=0x00007fda480b2800 nid=0x4671 waiting on condition [0x00007fd818ccb000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8b39290> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:341)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:685)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:640)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$1699/155725233.run(Unknown
 Source)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:651)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:624)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
        at java.lang.Thread.run(Thread.java:748)

"ChangelogNormalize(key=[k_event_id, k_user_id]) -> Calc(select=[k_user_id, 
name, CAST(timestamp) AS timestamp, k_event_id, user_id, payload]) -> 
NotNullEnforcer(fields=[k_user_id, k_event_id]) (4/4)#0" #1418 prio=5 os_prio=0 
tid=0x00007fda480b1800 nid=0x466e waiting on condition [0x00007fd7883c4000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8918bf0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:341)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:685)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:640)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$1699/155725233.run(Unknown
 Source)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:651)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:624)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
        at java.lang.Thread.run(Thread.java:748)

"ChangelogNormalize(key=[k_event_id, k_user_id]) -> Calc(select=[k_user_id, 
name, CAST(timestamp) AS timestamp, k_event_id, user_id, payload]) -> 
NotNullEnforcer(fields=[k_user_id, k_event_id]) (3/4)#0" #1417 prio=5 os_prio=0 
tid=0x00007fda480b0800 nid=0x466d waiting on condition [0x00007fd7da9e8000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8a15bc0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:341)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:685)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:640)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$1699/155725233.run(Unknown
 Source)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:651)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:624)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
        at java.lang.Thread.run(Thread.java:748)

"Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, 
default_database, upsert_kafka]], fields=[k_user_id, name, k_event_id, user_id, 
payload, timestamp]) (1/4)#0" #1416 daemon prio=5 os_prio=0 
tid=0x00007fd870001000 nid=0x466c waiting on condition [0x00007fd7884c5000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8d0d0e0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue.getBatchBlocking(ClosableBlockingQueue.java:396)
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:240)

"Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, 
default_database, upsert_kafka]], fields=[k_user_id, name, k_event_id, user_id, 
payload, timestamp]) (1/4)#0" #1403 prio=5 os_prio=0 tid=0x00007fd840005000 
nid=0x466b in Object.wait() [0x00007fd788fd0000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.Object.wait(Object.java:502)
        at 
org.apache.flink.streaming.connectors.kafka.internals.Handover.pollNext(Handover.java:73)
        - locked <0x00000000f8d72e58> (a java.lang.Object)
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:133)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:62)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

"channel-state-unspilling-thread-1" #1415 daemon prio=5 os_prio=0 
tid=0x00007fd840004000 nid=0x466a waiting on condition [0x00007fd78bbfa000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8d45b28> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

"ChangelogNormalize(key=[k_event_id, k_user_id]) -> Calc(select=[k_user_id, 
name, CAST(timestamp) AS timestamp, k_event_id, user_id, payload]) -> 
NotNullEnforcer(fields=[k_user_id, k_event_id]) (2/4)#0" #1407 prio=5 os_prio=0 
tid=0x00007fda480b0000 nid=0x4665 waiting on condition [0x00007fd7db9f8000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8ad7b68> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:341)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:685)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:640)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$1699/155725233.run(Unknown
 Source)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:651)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:624)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
        at java.lang.Thread.run(Thread.java:748)

"Source: TableSourceScan(table=[[default_catalog, default_database, 
upsert_kafka]], fields=[k_user_id, name, k_event_id, user_id, payload, 
timestamp]) (4/4)#0" #1406 prio=5 os_prio=0 tid=0x00007fda480af000 nid=0x4664 
waiting on condition [0x00007fd7899da000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8bf2718> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:341)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:685)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:640)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$1699/155725233.run(Unknown
 Source)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:651)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:624)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
        at java.lang.Thread.run(Thread.java:748)

"Source: TableSourceScan(table=[[default_catalog, default_database, 
upsert_kafka]], fields=[k_user_id, name, k_event_id, user_id, payload, 
timestamp]) (3/4)#0" #1405 prio=5 os_prio=0 tid=0x00007fda48084800 nid=0x4663 
waiting on condition [0x00007fd788acb000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8ce0488> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:341)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:685)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:640)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$1699/155725233.run(Unknown
 Source)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:651)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:624)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
        at java.lang.Thread.run(Thread.java:748)

"Source: TableSourceScan(table=[[default_catalog, default_database, 
upsert_kafka]], fields=[k_user_id, name, k_event_id, user_id, payload, 
timestamp]) (2/4)#0" #1404 prio=5 os_prio=0 tid=0x00007fda48084000 nid=0x4662 
waiting on condition [0x00007fd788ccd000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8d624c8> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:341)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:685)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:640)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$1699/155725233.run(Unknown
 Source)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:651)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:624)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
        at java.lang.Thread.run(Thread.java:748)


"Source: TableSourceScan(table=[[default_catalog, default_database, 
upsert_kafka]], fields=[k_user_id, name, k_event_id, user_id, payload, 
timestamp]) (1/4)#0" #1400 prio=5 os_prio=0 tid=0x00007fda480b9000 nid=0x465f 
waiting on condition [0x00007fd7d96d5000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000f8d88820> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:341)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:685)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:640)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$1699/155725233.run(Unknown
 Source)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:651)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:624)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
        at java.lang.Thread.run(Thread.java:748)
{code}

> UpsertKafkaTableITCase hangs when collecting results
> ----------------------------------------------------
>
>                 Key: FLINK-22416
>                 URL: https://issues.apache.org/jira/browse/FLINK-22416
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Table SQL / Ecosystem
>    Affects Versions: 1.13.0
>            Reporter: Dawid Wysakowicz
>            Assignee: Qingsheng Ren
>            Priority: Blocker
>              Labels: test-stability
>             Fix For: 1.14.0
>
>         Attachments: idea-test.png, threads_report.txt
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17037&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5&l=7002
> {code}
> 2021-04-22T11:16:35.6812919Z Apr 22 11:16:35 [ERROR] 
> testSourceSinkWithKeyAndPartialValue[format = 
> csv](org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase)
>   Time elapsed: 30.01 s  <<< ERROR!
> 2021-04-22T11:16:35.6814151Z Apr 22 11:16:35 
> org.junit.runners.model.TestTimedOutException: test timed out after 30 seconds
> 2021-04-22T11:16:35.6814781Z Apr 22 11:16:35  at 
> java.lang.Thread.sleep(Native Method)
> 2021-04-22T11:16:35.6815444Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sleepBeforeRetry(CollectResultFetcher.java:237)
> 2021-04-22T11:16:35.6816250Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:113)
> 2021-04-22T11:16:35.6817033Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> 2021-04-22T11:16:35.6817719Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> 2021-04-22T11:16:35.6818351Z Apr 22 11:16:35  at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
> 2021-04-22T11:16:35.6818980Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectRows(KafkaTableTestUtils.java:52)
> 2021-04-22T11:16:35.6819978Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase.testSourceSinkWithKeyAndPartialValue(UpsertKafkaTableITCase.java:147)
> 2021-04-22T11:16:35.6820803Z Apr 22 11:16:35  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-04-22T11:16:35.6821365Z Apr 22 11:16:35  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-04-22T11:16:35.6822072Z Apr 22 11:16:35  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-04-22T11:16:35.6822656Z Apr 22 11:16:35  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-04-22T11:16:35.6823124Z Apr 22 11:16:35  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2021-04-22T11:16:35.6823672Z Apr 22 11:16:35  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-04-22T11:16:35.6824202Z Apr 22 11:16:35  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2021-04-22T11:16:35.6824709Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-04-22T11:16:35.6825230Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-04-22T11:16:35.6825716Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2021-04-22T11:16:35.6826204Z Apr 22 11:16:35  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2021-04-22T11:16:35.6826807Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> 2021-04-22T11:16:35.6827378Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> 2021-04-22T11:16:35.6827926Z Apr 22 11:16:35  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2021-04-22T11:16:35.6828331Z Apr 22 11:16:35  at 
> java.lang.Thread.run(Thread.java:748)
> 2021-04-22T11:16:35.6828600Z Apr 22 11:16:35 
> 2021-04-22T11:16:35.6829073Z Apr 22 11:16:35 [ERROR] testAggregate[format = 
> csv](org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase)
>   Time elapsed: 30.001 s  <<< ERROR!
> 2021-04-22T11:16:35.6829689Z Apr 22 11:16:35 
> org.junit.runners.model.TestTimedOutException: test timed out after 30 seconds
> 2021-04-22T11:16:35.6830073Z Apr 22 11:16:35  at sun.misc.Unsafe.park(Native 
> Method)
> 2021-04-22T11:16:35.6830468Z Apr 22 11:16:35  at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> 2021-04-22T11:16:35.6831165Z Apr 22 11:16:35  at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> 2021-04-22T11:16:35.6832071Z Apr 22 11:16:35  at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> 2021-04-22T11:16:35.6832927Z Apr 22 11:16:35  at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> 2021-04-22T11:16:35.6833427Z Apr 22 11:16:35  at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2021-04-22T11:16:35.6833930Z Apr 22 11:16:35  at 
> org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:129)
> 2021-04-22T11:16:35.6834497Z Apr 22 11:16:35  at 
> org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:92)
> 2021-04-22T11:16:35.6835331Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase.wordCountToUpsertKafka(UpsertKafkaTableITCase.java:340)
> 2021-04-22T11:16:35.6836104Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase.testAggregate(UpsertKafkaTableITCase.java:72)
> 2021-04-22T11:16:35.6836728Z Apr 22 11:16:35  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-04-22T11:16:35.6837269Z Apr 22 11:16:35  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-04-22T11:16:35.6837837Z Apr 22 11:16:35  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-04-22T11:16:35.6838311Z Apr 22 11:16:35  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-04-22T11:16:35.6838945Z Apr 22 11:16:35  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2021-04-22T11:16:35.6839507Z Apr 22 11:16:35  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-04-22T11:16:35.6840092Z Apr 22 11:16:35  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2021-04-22T11:16:35.6840595Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-04-22T11:16:35.6841105Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-04-22T11:16:35.6841738Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2021-04-22T11:16:35.6842236Z Apr 22 11:16:35  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2021-04-22T11:16:35.6842861Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> 2021-04-22T11:16:35.6843436Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> 2021-04-22T11:16:35.6843939Z Apr 22 11:16:35  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2021-04-22T11:16:35.6844335Z Apr 22 11:16:35  at 
> java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to