[
https://issues.apache.org/jira/browse/FLINK-22416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17372581#comment-17372581
]
Qingsheng Ren commented on FLINK-22416:
---------------------------------------
I checked logs of recent 3 failure instances, and seems that the starting
offset is not seek to 0 even though the offset reset strategy is EARLIEST:
For
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=19306&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5&l=10718]
:
{noformat}
INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] -
[Consumer clientId=consumer-103, groupId=null] Seeking to EARLIEST offset of
partition key_full_value_topic_avro-0
INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] -
[Consumer clientId=consumer-103, groupId=null] Resetting offset for partition
key_full_value_topic_avro-0 to offset 4.{noformat}
For
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=19196&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5&l=11100]
:
{noformat}
INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] -
[Consumer clientId=consumer-112, groupId=null] Seeking to EARLIEST offset of
partition key_partial_value_topic_avro-0
INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] -
[Consumer clientId=consumer-112, groupId=null] Resetting offset for partition
key_partial_value_topic_avro-0 to offset 4.
{noformat}
For
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18953&view=logs&j=4be4ed2b-549a-533d-aa33-09e28e360cc8&t=0db94045-2aa0-53fa-f444-0130d6933518&l=11801]
:
{noformat}
INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] -
[Consumer clientId=consumer-94, groupId=null] Seeking to EARLIEST offset of
partition users_avro-0
INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] -
[Consumer clientId=consumer-94, groupId=null] Resetting offset for partition
users_avro-1 to offset 5.
INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] -
[Consumer clientId=consumer-94, groupId=null] Resetting offset for partition
users_avro-0 to offset 6.
INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] -
[Consumer clientId=consumer-94, groupId=null] Seeking to EARLIEST offset of
partition users_avro-1
INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] -
[Consumer clientId=consumer-94, groupId=null] Resetting offset for partition
users_avro-1 to offset 5.{noformat}
This might explain why collectRow timed out: Kafka Consumer didn't fetch enough
messages because it didn't start reading from earliest offset, and the
collector just waited forever.
Since each test run creates a new topic, the earliest offset of partition is
supposed to be zero. I'm still investigating why Kafka brokers return a
non-zero offset for EARLIEST reset strategy. This might happen if Kafka log
passes retention time and being truncated, but this should not be the reason
because the default retention time is 7 days and the test will not run for that
long.
Another point I'd like to mention is that all three failure instances mentioned
above failed with the last format option (avro). This could be just
coincidence, just providing more information for debugging~
> UpsertKafkaTableITCase hangs when collecting results
> ----------------------------------------------------
>
> Key: FLINK-22416
> URL: https://issues.apache.org/jira/browse/FLINK-22416
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka, Table SQL / Ecosystem
> Affects Versions: 1.13.0
> Reporter: Dawid Wysakowicz
> Assignee: Qingsheng Ren
> Priority: Blocker
> Labels: test-stability
> Fix For: 1.14.0
>
> Attachments: idea-test.png, threads_report.txt
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17037&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5&l=7002
> {code}
> 2021-04-22T11:16:35.6812919Z Apr 22 11:16:35 [ERROR]
> testSourceSinkWithKeyAndPartialValue[format =
> csv](org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase)
> Time elapsed: 30.01 s <<< ERROR!
> 2021-04-22T11:16:35.6814151Z Apr 22 11:16:35
> org.junit.runners.model.TestTimedOutException: test timed out after 30 seconds
> 2021-04-22T11:16:35.6814781Z Apr 22 11:16:35 at
> java.lang.Thread.sleep(Native Method)
> 2021-04-22T11:16:35.6815444Z Apr 22 11:16:35 at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sleepBeforeRetry(CollectResultFetcher.java:237)
> 2021-04-22T11:16:35.6816250Z Apr 22 11:16:35 at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:113)
> 2021-04-22T11:16:35.6817033Z Apr 22 11:16:35 at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> 2021-04-22T11:16:35.6817719Z Apr 22 11:16:35 at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> 2021-04-22T11:16:35.6818351Z Apr 22 11:16:35 at
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
> 2021-04-22T11:16:35.6818980Z Apr 22 11:16:35 at
> org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectRows(KafkaTableTestUtils.java:52)
> 2021-04-22T11:16:35.6819978Z Apr 22 11:16:35 at
> org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase.testSourceSinkWithKeyAndPartialValue(UpsertKafkaTableITCase.java:147)
> 2021-04-22T11:16:35.6820803Z Apr 22 11:16:35 at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-04-22T11:16:35.6821365Z Apr 22 11:16:35 at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-04-22T11:16:35.6822072Z Apr 22 11:16:35 at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-04-22T11:16:35.6822656Z Apr 22 11:16:35 at
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-04-22T11:16:35.6823124Z Apr 22 11:16:35 at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2021-04-22T11:16:35.6823672Z Apr 22 11:16:35 at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-04-22T11:16:35.6824202Z Apr 22 11:16:35 at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2021-04-22T11:16:35.6824709Z Apr 22 11:16:35 at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-04-22T11:16:35.6825230Z Apr 22 11:16:35 at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-04-22T11:16:35.6825716Z Apr 22 11:16:35 at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2021-04-22T11:16:35.6826204Z Apr 22 11:16:35 at
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2021-04-22T11:16:35.6826807Z Apr 22 11:16:35 at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> 2021-04-22T11:16:35.6827378Z Apr 22 11:16:35 at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> 2021-04-22T11:16:35.6827926Z Apr 22 11:16:35 at
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2021-04-22T11:16:35.6828331Z Apr 22 11:16:35 at
> java.lang.Thread.run(Thread.java:748)
> 2021-04-22T11:16:35.6828600Z Apr 22 11:16:35
> 2021-04-22T11:16:35.6829073Z Apr 22 11:16:35 [ERROR] testAggregate[format =
> csv](org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase)
> Time elapsed: 30.001 s <<< ERROR!
> 2021-04-22T11:16:35.6829689Z Apr 22 11:16:35
> org.junit.runners.model.TestTimedOutException: test timed out after 30 seconds
> 2021-04-22T11:16:35.6830073Z Apr 22 11:16:35 at sun.misc.Unsafe.park(Native
> Method)
> 2021-04-22T11:16:35.6830468Z Apr 22 11:16:35 at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> 2021-04-22T11:16:35.6831165Z Apr 22 11:16:35 at
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> 2021-04-22T11:16:35.6832071Z Apr 22 11:16:35 at
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> 2021-04-22T11:16:35.6832927Z Apr 22 11:16:35 at
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> 2021-04-22T11:16:35.6833427Z Apr 22 11:16:35 at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2021-04-22T11:16:35.6833930Z Apr 22 11:16:35 at
> org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:129)
> 2021-04-22T11:16:35.6834497Z Apr 22 11:16:35 at
> org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:92)
> 2021-04-22T11:16:35.6835331Z Apr 22 11:16:35 at
> org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase.wordCountToUpsertKafka(UpsertKafkaTableITCase.java:340)
> 2021-04-22T11:16:35.6836104Z Apr 22 11:16:35 at
> org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase.testAggregate(UpsertKafkaTableITCase.java:72)
> 2021-04-22T11:16:35.6836728Z Apr 22 11:16:35 at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-04-22T11:16:35.6837269Z Apr 22 11:16:35 at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-04-22T11:16:35.6837837Z Apr 22 11:16:35 at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-04-22T11:16:35.6838311Z Apr 22 11:16:35 at
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-04-22T11:16:35.6838945Z Apr 22 11:16:35 at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2021-04-22T11:16:35.6839507Z Apr 22 11:16:35 at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-04-22T11:16:35.6840092Z Apr 22 11:16:35 at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2021-04-22T11:16:35.6840595Z Apr 22 11:16:35 at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-04-22T11:16:35.6841105Z Apr 22 11:16:35 at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-04-22T11:16:35.6841738Z Apr 22 11:16:35 at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2021-04-22T11:16:35.6842236Z Apr 22 11:16:35 at
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2021-04-22T11:16:35.6842861Z Apr 22 11:16:35 at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> 2021-04-22T11:16:35.6843436Z Apr 22 11:16:35 at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> 2021-04-22T11:16:35.6843939Z Apr 22 11:16:35 at
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2021-04-22T11:16:35.6844335Z Apr 22 11:16:35 at
> java.lang.Thread.run(Thread.java:748)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)