[ 
https://issues.apache.org/jira/browse/FLINK-19054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192027#comment-17192027
 ] 

Jiangjie Qin commented on FLINK-19054:
--------------------------------------

[~rmetzger] The test has two steps.
 # Write some records to a Kafka topic.
 # Consume from that Kafka topic and writes to a dummy sink.

The expected behavior is that after processing some messages, the dummy sink 
will throw an exception to cause the job to exit. However, it seems that the 
job never exited, therefore eventually the test timed out.

>From the stack trace it is unclear what caused the problem. It could be 
>because the message written at the first step were somehow lost so the topic 
>does not have any records. We can add some check in the test to verify that. 
>Also, it seems that this issue occurs recently, it might be related to some 
>recent change.

> KafkaTableITCase.testKafkaSourceSink hangs
> ------------------------------------------
>
>                 Key: FLINK-19054
>                 URL: https://issues.apache.org/jira/browse/FLINK-19054
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Table SQL / API
>    Affects Versions: 1.11.2
>            Reporter: Dian Fu
>            Priority: Critical
>              Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=5844&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=684b1416-4c17-504e-d5ab-97ee44e08a20]
> {code}
> 2020-08-25T09:04:57.3569768Z "Kafka Fetcher for Source: 
> KafkaTableSource(price, currency, log_date, log_time, log_ts) -> 
> SourceConversion(table=[default_catalog.default_database.kafka, source: 
> [KafkaTableSource(price, currency, log_date, log_time, log_ts)]], 
> fields=[price, currency, log_date, log_time, log_ts]) -> Calc(select=[(price 
> + 1.0:DECIMAL(2, 1)) AS computed-price, price, currency, log_date, log_time, 
> log_ts, (log_ts + 1000:INTERVAL SECOND) AS ts]) -> 
> WatermarkAssigner(rowtime=[ts], watermark=[ts]) -> Calc(select=[ts, log_date, 
> log_time, CAST(ts) AS ts0, price]) (1/1)" #1501 daemon prio=5 os_prio=0 
> tid=0x00007f250000b800 nid=0x22b8 runnable [0x00007f2127efd000]
> 2020-08-25T09:04:57.3571373Z    java.lang.Thread.State: RUNNABLE
> 2020-08-25T09:04:57.3571672Z  at sun.nio.ch.FileDispatcherImpl.read0(Native 
> Method)
> 2020-08-25T09:04:57.3572191Z  at 
> sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> 2020-08-25T09:04:57.3572921Z  at 
> sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> 2020-08-25T09:04:57.3573419Z  at sun.nio.ch.IOUtil.read(IOUtil.java:197)
> 2020-08-25T09:04:57.3573957Z  at 
> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:377)
> 2020-08-25T09:04:57.3574809Z  - locked <0x00000000fde5a308> (a 
> java.lang.Object)
> 2020-08-25T09:04:57.3575448Z  at 
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
> 2020-08-25T09:04:57.3576309Z  at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
> 2020-08-25T09:04:57.3577086Z  at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
> 2020-08-25T09:04:57.3577727Z  at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
> 2020-08-25T09:04:57.3578403Z  at 
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
> 2020-08-25T09:04:57.3579486Z  at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
> 2020-08-25T09:04:57.3580240Z  at 
> org.apache.kafka.common.network.Selector.poll(Selector.java:483)
> 2020-08-25T09:04:57.3580880Z  at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
> 2020-08-25T09:04:57.3581756Z  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
> 2020-08-25T09:04:57.3583015Z  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> 2020-08-25T09:04:57.3583847Z  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
> 2020-08-25T09:04:57.3584555Z  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
> 2020-08-25T09:04:57.3585197Z  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
> 2020-08-25T09:04:57.3585961Z  at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:253)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to