[ 
https://issues.apache.org/jira/browse/FLINK-17949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126353#comment-17126353
 ] 

Yuan Mei edited comment on FLINK-17949 at 6/5/20, 4:09 AM:
-----------------------------------------------------------

These comments are quoted from [~becket_qin]

"Theoretically speaking, using KafkaAdminClient to create a topic does not 100% 
guarantee that a producer will not see the "does not host this topic-partition" 
error. This is because when the AdminClient can only guarantee the topic 
metadata information has existed in the broker to which it sent the 
CreateTopicRequest. When a producer comes at a later point, it might send 
TopicMetdataRequest to a different broker and that broker may have not received 
the updated topic metadata yet. But this is much unlikely to happen given the 
broker usually receives the metadata update at the same time. Having retries 
configured on the producer side should be sufficient to handle such cases. We 
can also do that for 0.10 and 0.11 producers. But given that we have the 
producer properties scattered over the places (which is something we probably 
should avoid, to begin with), it would be simpler to just make sure the topic 
has been created successfully before we start the tests."

 

"If we do not want this happen, we should check the metadata cache of each 
broker to make sure it has the topic metadata".

 

I think this is one case possibly to cause the situation. But not sure whether 
it is the same case encountered here (LEADER_NOT_AVAIBLE)
{code:java}
13:34:40,854 [kafka-producer-network-thread | producer-7] WARN 
org.apache.kafka.clients.NetworkClient [] - [Producer clientId=producer-7] 
Error while fetching metadata with correlation id 12 : 
{test_serde_IngestionTime=LEADER_NOT_AVAILABLE}{code}
 


was (Author: ym):
This comment is quoted from [~becket_qin]

"Theoretically speaking, using KafkaAdminClient to create a topic does not 100% 
guarantee that a producer will not see the "does not host this topic-partition" 
error. This is because when the AdminClient can only guarantee the topic 
metadata information has existed in the broker to which it sent the 
CreateTopicRequest. When a producer comes at a later point, it might send 
TopicMetdataRequest to a different broker and that broker may have not received 
the updated topic metadata yet. But this is much unlikely to happen given the 
broker usually receives the metadata update at the same time. Having retries 
configured on the producer side should be sufficient to handle such cases. We 
can also do that for 0.10 and 0.11 producers. But given that we have the 
producer properties scattered over the places (which is something we probably 
should avoid, to begin with), it would be simpler to just make sure the topic 
has been created successfully before we start the tests."

 

"If we do not want this happen, we should check the metadata cache of each 
broker to make sure it has the topic metadata".

 

I think this is one case possibly to cause the situation. But not sure whether 
it is the same case encountered here (LEADER_NOT_AVAIBLE)
{code:java}
13:34:40,854 [kafka-producer-network-thread | producer-7] WARN 
org.apache.kafka.clients.NetworkClient [] - [Producer clientId=producer-7] 
Error while fetching metadata with correlation id 12 : 
{test_serde_IngestionTime=LEADER_NOT_AVAILABLE}{code}

  

> KafkaShuffleITCase.testSerDeIngestionTime:156->testRecordSerDe:388 
> expected:<310> but was:<0>
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17949
>                 URL: https://issues.apache.org/jira/browse/FLINK-17949
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Tests
>    Affects Versions: 1.11.0, 1.12.0
>            Reporter: Robert Metzger
>            Priority: Critical
>              Labels: test-stability
>         Attachments: logs-ci-kafkagelly-1590500380.zip, 
> logs-ci-kafkagelly-1590524911.zip
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=2209&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=684b1416-4c17-504e-d5ab-97ee44e08a20
> {code}
> 2020-05-26T13:35:19.4022562Z [ERROR] 
> testSerDeIngestionTime(org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase)
>   Time elapsed: 5.786 s  <<< FAILURE!
> 2020-05-26T13:35:19.4023185Z java.lang.AssertionError: expected:<310> but 
> was:<0>
> 2020-05-26T13:35:19.4023498Z  at org.junit.Assert.fail(Assert.java:88)
> 2020-05-26T13:35:19.4023825Z  at 
> org.junit.Assert.failNotEquals(Assert.java:834)
> 2020-05-26T13:35:19.4024461Z  at 
> org.junit.Assert.assertEquals(Assert.java:645)
> 2020-05-26T13:35:19.4024900Z  at 
> org.junit.Assert.assertEquals(Assert.java:631)
> 2020-05-26T13:35:19.4028546Z  at 
> org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase.testRecordSerDe(KafkaShuffleITCase.java:388)
> 2020-05-26T13:35:19.4029629Z  at 
> org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase.testSerDeIngestionTime(KafkaShuffleITCase.java:156)
> 2020-05-26T13:35:19.4030253Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-05-26T13:35:19.4030673Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-05-26T13:35:19.4031332Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-05-26T13:35:19.4031763Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-05-26T13:35:19.4032155Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-05-26T13:35:19.4032630Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-05-26T13:35:19.4033188Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-05-26T13:35:19.4033638Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-05-26T13:35:19.4034103Z  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2020-05-26T13:35:19.4034593Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> 2020-05-26T13:35:19.4035118Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> 2020-05-26T13:35:19.4035570Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2020-05-26T13:35:19.4035888Z  at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to