[
https://issues.apache.org/jira/browse/FLINK-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14543824#comment-14543824
]
ASF GitHub Bot commented on FLINK-2008:
---------------------------------------
GitHub user rmetzger opened a pull request:
https://github.com/apache/flink/pull/675
[FLINK-2008] Fix broker failure test case
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/rmetzger/flink stephan_kafka
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/675.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #675
----
commit ad449cfd308559734daa493b34d5d40305972c82
Author: Robert Metzger <[email protected]>
Date: 2015-05-13T07:34:37Z
[FLINK-2008] Fix broker failure test case
----
> PersistentKafkaSource is sometimes emitting tuples multiple times
> -----------------------------------------------------------------
>
> Key: FLINK-2008
> URL: https://issues.apache.org/jira/browse/FLINK-2008
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector, Streaming
> Affects Versions: 0.9
> Reporter: Robert Metzger
> Assignee: Robert Metzger
>
> The PersistentKafkaSource is expected to emit records exactly once.
> Two test cases of the KafkaITCase are sporadically failing because records
> are emitted multiple times.
> Affected tests:
> {{testPersistentSourceWithOffsetUpdates()}}, after the offsets have been
> changed manually in ZK:
> {code}
> java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0
> array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
> 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
> 2]
> {code}
> {{brokerFailureTest()}} also fails:
> {code}
> 05/13/2015 08:13:16 Custom source -> Stream Sink(1/1) switched to FAILED
> java.lang.AssertionError: Received tuple with value 21 twice
> at org.junit.Assert.fail(Assert.java:88)
> at org.junit.Assert.assertTrue(Assert.java:41)
> at org.junit.Assert.assertFalse(Assert.java:64)
> at
> org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877)
> at
> org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859)
> at
> org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39)
> at
> org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
> at
> org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54)
> at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39)
> at
> org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173)
> at
> org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40)
> at
> org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.lang.Thread.run(Thread.java:745)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)