[
https://issues.apache.org/jira/browse/FLINK-19054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192027#comment-17192027
]
Jiangjie Qin commented on FLINK-19054:
--------------------------------------
[~rmetzger] The test has two steps.
# Write some records to a Kafka topic.
# Consume from that Kafka topic and writes to a dummy sink.
The expected behavior is that after processing some messages, the dummy sink
will throw an exception to cause the job to exit. However, it seems that the
job never exited, therefore eventually the test timed out.
>From the stack trace it is unclear what caused the problem. It could be
>because the message written at the first step were somehow lost so the topic
>does not have any records. We can add some check in the test to verify that.
>Also, it seems that this issue occurs recently, it might be related to some
>recent change.
> KafkaTableITCase.testKafkaSourceSink hangs
> ------------------------------------------
>
> Key: FLINK-19054
> URL: https://issues.apache.org/jira/browse/FLINK-19054
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka, Table SQL / API
> Affects Versions: 1.11.2
> Reporter: Dian Fu
> Priority: Critical
> Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=5844&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=684b1416-4c17-504e-d5ab-97ee44e08a20]
> {code}
> 2020-08-25T09:04:57.3569768Z "Kafka Fetcher for Source:
> KafkaTableSource(price, currency, log_date, log_time, log_ts) ->
> SourceConversion(table=[default_catalog.default_database.kafka, source:
> [KafkaTableSource(price, currency, log_date, log_time, log_ts)]],
> fields=[price, currency, log_date, log_time, log_ts]) -> Calc(select=[(price
> + 1.0:DECIMAL(2, 1)) AS computed-price, price, currency, log_date, log_time,
> log_ts, (log_ts + 1000:INTERVAL SECOND) AS ts]) ->
> WatermarkAssigner(rowtime=[ts], watermark=[ts]) -> Calc(select=[ts, log_date,
> log_time, CAST(ts) AS ts0, price]) (1/1)" #1501 daemon prio=5 os_prio=0
> tid=0x00007f250000b800 nid=0x22b8 runnable [0x00007f2127efd000]
> 2020-08-25T09:04:57.3571373Z java.lang.Thread.State: RUNNABLE
> 2020-08-25T09:04:57.3571672Z at sun.nio.ch.FileDispatcherImpl.read0(Native
> Method)
> 2020-08-25T09:04:57.3572191Z at
> sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> 2020-08-25T09:04:57.3572921Z at
> sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> 2020-08-25T09:04:57.3573419Z at sun.nio.ch.IOUtil.read(IOUtil.java:197)
> 2020-08-25T09:04:57.3573957Z at
> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:377)
> 2020-08-25T09:04:57.3574809Z - locked <0x00000000fde5a308> (a
> java.lang.Object)
> 2020-08-25T09:04:57.3575448Z at
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
> 2020-08-25T09:04:57.3576309Z at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
> 2020-08-25T09:04:57.3577086Z at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
> 2020-08-25T09:04:57.3577727Z at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
> 2020-08-25T09:04:57.3578403Z at
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
> 2020-08-25T09:04:57.3579486Z at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
> 2020-08-25T09:04:57.3580240Z at
> org.apache.kafka.common.network.Selector.poll(Selector.java:483)
> 2020-08-25T09:04:57.3580880Z at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
> 2020-08-25T09:04:57.3581756Z at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
> 2020-08-25T09:04:57.3583015Z at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> 2020-08-25T09:04:57.3583847Z at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
> 2020-08-25T09:04:57.3584555Z at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
> 2020-08-25T09:04:57.3585197Z at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
> 2020-08-25T09:04:57.3585961Z at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:253)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)