[
https://issues.apache.org/jira/browse/FLINK-17949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126353#comment-17126353
]
Yuan Mei edited comment on FLINK-17949 at 6/5/20, 4:09 AM:
-----------------------------------------------------------
These comments are quoted from [~becket_qin]
"Theoretically speaking, using KafkaAdminClient to create a topic does not 100%
guarantee that a producer will not see the "does not host this topic-partition"
error. This is because when the AdminClient can only guarantee the topic
metadata information has existed in the broker to which it sent the
CreateTopicRequest. When a producer comes at a later point, it might send
TopicMetdataRequest to a different broker and that broker may have not received
the updated topic metadata yet. But this is much unlikely to happen given the
broker usually receives the metadata update at the same time. Having retries
configured on the producer side should be sufficient to handle such cases. We
can also do that for 0.10 and 0.11 producers. But given that we have the
producer properties scattered over the places (which is something we probably
should avoid, to begin with), it would be simpler to just make sure the topic
has been created successfully before we start the tests."
"If we do not want this happen, we should check the metadata cache of each
broker to make sure it has the topic metadata".
I think this is one case possibly to cause the situation. But not sure whether
it is the same case encountered here (LEADER_NOT_AVAIBLE)
{code:java}
13:34:40,854 [kafka-producer-network-thread | producer-7] WARN
org.apache.kafka.clients.NetworkClient [] - [Producer clientId=producer-7]
Error while fetching metadata with correlation id 12 :
{test_serde_IngestionTime=LEADER_NOT_AVAILABLE}{code}
was (Author: ym):
This comment is quoted from [~becket_qin]
"Theoretically speaking, using KafkaAdminClient to create a topic does not 100%
guarantee that a producer will not see the "does not host this topic-partition"
error. This is because when the AdminClient can only guarantee the topic
metadata information has existed in the broker to which it sent the
CreateTopicRequest. When a producer comes at a later point, it might send
TopicMetdataRequest to a different broker and that broker may have not received
the updated topic metadata yet. But this is much unlikely to happen given the
broker usually receives the metadata update at the same time. Having retries
configured on the producer side should be sufficient to handle such cases. We
can also do that for 0.10 and 0.11 producers. But given that we have the
producer properties scattered over the places (which is something we probably
should avoid, to begin with), it would be simpler to just make sure the topic
has been created successfully before we start the tests."
"If we do not want this happen, we should check the metadata cache of each
broker to make sure it has the topic metadata".
I think this is one case possibly to cause the situation. But not sure whether
it is the same case encountered here (LEADER_NOT_AVAIBLE)
{code:java}
13:34:40,854 [kafka-producer-network-thread | producer-7] WARN
org.apache.kafka.clients.NetworkClient [] - [Producer clientId=producer-7]
Error while fetching metadata with correlation id 12 :
{test_serde_IngestionTime=LEADER_NOT_AVAILABLE}{code}
> KafkaShuffleITCase.testSerDeIngestionTime:156->testRecordSerDe:388
> expected:<310> but was:<0>
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-17949
> URL: https://issues.apache.org/jira/browse/FLINK-17949
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka, Tests
> Affects Versions: 1.11.0, 1.12.0
> Reporter: Robert Metzger
> Priority: Critical
> Labels: test-stability
> Attachments: logs-ci-kafkagelly-1590500380.zip,
> logs-ci-kafkagelly-1590524911.zip
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=2209&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=684b1416-4c17-504e-d5ab-97ee44e08a20
> {code}
> 2020-05-26T13:35:19.4022562Z [ERROR]
> testSerDeIngestionTime(org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase)
> Time elapsed: 5.786 s <<< FAILURE!
> 2020-05-26T13:35:19.4023185Z java.lang.AssertionError: expected:<310> but
> was:<0>
> 2020-05-26T13:35:19.4023498Z at org.junit.Assert.fail(Assert.java:88)
> 2020-05-26T13:35:19.4023825Z at
> org.junit.Assert.failNotEquals(Assert.java:834)
> 2020-05-26T13:35:19.4024461Z at
> org.junit.Assert.assertEquals(Assert.java:645)
> 2020-05-26T13:35:19.4024900Z at
> org.junit.Assert.assertEquals(Assert.java:631)
> 2020-05-26T13:35:19.4028546Z at
> org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase.testRecordSerDe(KafkaShuffleITCase.java:388)
> 2020-05-26T13:35:19.4029629Z at
> org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase.testSerDeIngestionTime(KafkaShuffleITCase.java:156)
> 2020-05-26T13:35:19.4030253Z at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-05-26T13:35:19.4030673Z at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-05-26T13:35:19.4031332Z at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-05-26T13:35:19.4031763Z at
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-05-26T13:35:19.4032155Z at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-05-26T13:35:19.4032630Z at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-05-26T13:35:19.4033188Z at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-05-26T13:35:19.4033638Z at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-05-26T13:35:19.4034103Z at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2020-05-26T13:35:19.4034593Z at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> 2020-05-26T13:35:19.4035118Z at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> 2020-05-26T13:35:19.4035570Z at
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2020-05-26T13:35:19.4035888Z at java.lang.Thread.run(Thread.java:748)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)