[
https://issues.apache.org/jira/browse/FLINK-12030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17121524#comment-17121524
]
Jiangjie Qin commented on FLINK-12030:
--------------------------------------
[~rmetzger] Technically speaking this is caused by the same problem of
asynchronous communication between the controller and the brokers. I explained
the potential failure case in the PR.
{quote}
Theoretically speaking, using {{KafkaAdminClient}} to create topic does not
100% guarantee that a producer will not see the "does not host this
topic-partition" error. This is because when the {{AdminClient}} can only
guarantee the topic metadata information has existed in the broker to which it
sent the {{CreateTopicRequest}}. When a producer comes at a later point, it
might send {{TopicMetdataRequest}} to a different broker and that broker may
have not received the updated topic metadata yet. But this is much unlikely to
happen given the broker usually receives the metadata update at the same time.
Having retries configured on the producer side should be sufficient to handle
such cases. We can also do that for 0.10 and 0.11 producers. But given that we
have the producer properties scattered over the places (which is something we
probably should avoid to begin with), it would be simpler to just make sure the
topic has been created successfully before we start the tests.
{quote}
I think we are hitting the issue here. To be absolutely sure no such exception
is thrown, we need to check each broker to ensure they are aware of the topic.
Given that we have a bunch of other KafkaITCase stability tickets to handle at
this point. I'd suggest to do this a little later unless we see this problem
comes up frequently.
> KafkaITCase.testMultipleSourcesOnePartition is unstable: This server does not
> host this topic-partition
> -------------------------------------------------------------------------------------------------------
>
> Key: FLINK-12030
> URL: https://issues.apache.org/jira/browse/FLINK-12030
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka, Tests
> Affects Versions: 1.11.0
> Reporter: Aljoscha Krettek
> Assignee: Jiangjie Qin
> Priority: Critical
> Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
>
> This is a relevant part from the log:
> {code}
> 14:11:45,305 INFO org.apache.flink.streaming.connectors.kafka.KafkaITCase
> -
> ================================================================================
> Test
> testMetricsAndEndOfStream(org.apache.flink.streaming.connectors.kafka.KafkaITCase)
> is running.
> --------------------------------------------------------------------------------
> 14:11:45,310 INFO org.apache.flink.streaming.connectors.kafka.KafkaTestBase
> -
> ===================================
> == Writing sequence of 300 into testEndOfStream with p=1
> ===================================
> 14:11:45,311 INFO org.apache.flink.streaming.connectors.kafka.KafkaTestBase
> - Writing attempt #1
> 14:11:45,316 INFO
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl -
> Creating topic testEndOfStream-1
> 14:11:45,863 WARN
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Property
> [transaction.timeout.ms] not specified. Setting it to 3600000 ms
> 14:11:45,910 WARN
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Using
> AT_LEAST_ONCE semantic, but checkpointing is not enabled. Switching to NONE
> semantic.
> 14:11:45,921 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Starting
> FlinkKafkaInternalProducer (1/1) to produce into default topic
> testEndOfStream-1
> 14:11:46,006 ERROR org.apache.flink.streaming.connectors.kafka.KafkaTestBase
> - Write attempt failed, trying again
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
> at
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79)
> at
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.writeSequence(KafkaConsumerTestBase.java:1918)
> at
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runEndOfStreamTest(KafkaConsumerTestBase.java:1537)
> at
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testMetricsAndEndOfStream(KafkaITCase.java:136)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
> Failed to send data to Kafka: This server does not host this topic-partition.
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1002)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:787)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:658)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:443)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:318)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> ... 1 more
> {code}
> Travis run: https://travis-ci.org/apache/flink/jobs/510953235
--
This message was sent by Atlassian Jira
(v8.3.4#803005)