[ 
https://issues.apache.org/jira/browse/FLINK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17121524#comment-17121524
 ] 

Jiangjie Qin commented on FLINK-12030:
--------------------------------------

[~rmetzger] Technically speaking this is caused by the same problem of 
asynchronous communication between the controller and the brokers. I explained 
the potential failure case in the PR.

{quote}

Theoretically speaking, using {{KafkaAdminClient}} to create topic does not 
100% guarantee that a producer will not see the "does not host this 
topic-partition" error. This is because when the {{AdminClient}} can only 
guarantee the topic metadata information has existed in the broker to which it 
sent the {{CreateTopicRequest}}. When a producer comes at a later point, it 
might send {{TopicMetdataRequest}} to a different broker and that broker may 
have not received the updated topic metadata yet. But this is much unlikely to 
happen given the broker usually receives the metadata update at the same time. 
Having retries configured on the producer side should be sufficient to handle 
such cases. We can also do that for 0.10 and 0.11 producers. But given that we 
have the producer properties scattered over the places (which is something we 
probably should avoid to begin with), it would be simpler to just make sure the 
topic has been created successfully before we start the tests.

{quote}

I think we are hitting the issue here. To be absolutely sure no such exception 
is thrown, we need to check each broker to ensure they are aware of the topic.

Given that we have a bunch of other KafkaITCase stability tickets to handle at 
this point. I'd suggest to do this a little later unless we see this problem 
comes up frequently.

> KafkaITCase.testMultipleSourcesOnePartition is unstable: This server does not 
> host this topic-partition
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-12030
>                 URL: https://issues.apache.org/jira/browse/FLINK-12030
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Tests
>    Affects Versions: 1.11.0
>            Reporter: Aljoscha Krettek
>            Assignee: Jiangjie Qin
>            Priority: Critical
>              Labels: pull-request-available, test-stability
>             Fix For: 1.11.0
>
>
> This is a relevant part from the log:
> {code}
> 14:11:45,305 INFO  org.apache.flink.streaming.connectors.kafka.KafkaITCase    
>    - 
> ================================================================================
> Test 
> testMetricsAndEndOfStream(org.apache.flink.streaming.connectors.kafka.KafkaITCase)
>  is running.
> --------------------------------------------------------------------------------
> 14:11:45,310 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>    - 
> ===================================
> == Writing sequence of 300 into testEndOfStream with p=1
> ===================================
> 14:11:45,311 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>    - Writing attempt #1
> 14:11:45,316 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - 
> Creating topic testEndOfStream-1
> 14:11:45,863 WARN  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Property 
> [transaction.timeout.ms] not specified. Setting it to 3600000 ms
> 14:11:45,910 WARN  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Using 
> AT_LEAST_ONCE semantic, but checkpointing is not enabled. Switching to NONE 
> semantic.
> 14:11:45,921 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Starting 
> FlinkKafkaInternalProducer (1/1) to produce into default topic 
> testEndOfStream-1
> 14:11:46,006 ERROR org.apache.flink.streaming.connectors.kafka.KafkaTestBase  
>    - Write attempt failed, trying again
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>       at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>       at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>       at 
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79)
>       at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.writeSequence(KafkaConsumerTestBase.java:1918)
>       at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runEndOfStreamTest(KafkaConsumerTestBase.java:1537)
>       at 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testMetricsAndEndOfStream(KafkaITCase.java:136)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>       at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
> Failed to send data to Kafka: This server does not host this topic-partition.
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1002)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:787)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:658)
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:443)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:318)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>       ... 1 more
> {code}
> Travis run: https://travis-ci.org/apache/flink/jobs/510953235



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to