[ 
https://issues.apache.org/jira/browse/FLINK-14309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16943347#comment-16943347
 ] 

Jiayi Liao edited comment on FLINK-14309 at 10/3/19 5:40 AM:
-------------------------------------------------------------

I didn't reproduce the error after tens of times trying. But the most possible 
reason is that the KafkaProducerSink fails to snapshot(but the checkpoint 
succeeds) and the job fails after the first checkpoint. Since the flush() 
method in KafkaProducer didn't have a message guarantee, maybe we can increase 
the retries config value(default=0) and set acks config value to 
"all"(default=0) at first. I'm not completely sure this will help but it would 
be better if we add these. (the project already add the configs in consumer's 
tests)

Here is the comments from flush() method:
{code:java}
     * This example shows how to consume from one Kafka topic and produce to 
another Kafka topic:
     * <pre>
     * {@code
     * for(ConsumerRecord<String, String> record: consumer.poll(100))
     *     producer.send(new ProducerRecord("my-topic", record.key(), 
record.value());
     * producer.flush();
     * consumer.commit();
     * }
     * </pre>
     *
     * Note that the above example may drop records if the produce request 
fails. If we want to ensure that this does not occur
     * we need to set <code>retries=&lt;large_number&gt;</code> in our config.
{code}
cc [~becket_qin] [~trohrmann]


was (Author: wind_ljy):
I didn't reproduce the error after tens of times trying. But the most possible 
reason is that the KafkaProducerSink fails to snapshot(but the checkpoint 
succeeds) and the job fails after the first checkpoint. Since the flush() 
method in KafkaProducer didn't have a message guarantee, maybe we can increase 
the retries config value(default=0) and set acks config value to 
"all"(default=0) at first.  I'm not completely sure this will help but it would 
be better if we add these. (the project already add the configs in consumer's 
tests)

Here is the comments from flush() method:


{code:java}
     * This example shows how to consume from one Kafka topic and produce to 
another Kafka topic:
     * <pre>
     * {@code
     * for(ConsumerRecord<String, String> record: consumer.poll(100))
     *     producer.send(new ProducerRecord("my-topic", record.key(), 
record.value());
     * producer.flush();
     * consumer.commit();
     * }
     * </pre>
     *
     * Note that the above example may drop records if the produce request 
fails. If we want to ensure that this does not occur
     * we need to set <code>retries=&lt;large_number&gt;</code> in our config.
{code}


[~becket_qin] [~trohrmann]

> Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink
>  fails on Travis
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14309
>                 URL: https://issues.apache.org/jira/browse/FLINK-14309
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.10.0
>            Reporter: Till Rohrmann
>            Priority: Critical
>              Labels: test-stability
>             Fix For: 1.10.0
>
>
> The 
> {{Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink}}
>  fails on Travis with 
> {code}
> Test 
> testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase)
>  failed with:
> java.lang.AssertionError: Job should fail!
>       at org.junit.Assert.fail(Assert.java:88)
>       at 
> org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnce(KafkaProducerTestBase.java:280)
>       at 
> org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink(KafkaProducerTestBase.java:206)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>       at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>       at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>       at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>       at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>       at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>       at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>       at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>       at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
>       at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
>       at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
>       at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
>       at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
>       at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
>       at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
>       at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> {code}
> https://api.travis-ci.com/v3/job/240747188/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to