[ https://issues.apache.org/jira/browse/APEXMALHAR-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333107#comment-15333107 ]
ASF GitHub Bot commented on APEXMALHAR-2120: -------------------------------------------- GitHub user brightchen reopened a pull request: https://github.com/apache/apex-malhar/pull/320 APEXMALHAR-2120 #resolve #comment solve problems of KafkaInputOperato… …rTest and AbstractKafkaInputOperator You can merge this pull request into a Git repository by running: $ git pull https://github.com/brightchen/apex-malhar APEXMALHAR-2120 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/320.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #320 ---- commit 6f99fb20da7214f493b442ce4ba3f447ec51cd13 Author: brightchen <bri...@datatorrent.com> Date: 2016-06-14T23:30:17Z APEXMALHAR-2120 #resolve #comment solve problems of KafkaInputOperatorTest and AbstractKafkaInputOperator ---- > Fix bugs on KafkaInputOperatorTest AbstractKafkaInputOperator > ------------------------------------------------------------- > > Key: APEXMALHAR-2120 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2120 > Project: Apache Apex Malhar > Issue Type: Bug > Affects Versions: 3.4.0 > Reporter: bright chen > Assignee: bright chen > Fix For: 3.5.0 > > > problems in Unit Test class: KafkaInputOperatorTest > - 'k' not initialized for each test case > - The assert was not correct > - The test case assume the END_TUPLE will be received at the end of normal > tuples, but in fact the tuples could be out of order where support multiple > cluster or partition > - The operator AbstractKafkaInputOperator implemented as "at least once", but > the test case assume "exactly once" > problem of AbstractKafkaInputOperator: > For test case KafkaInputOperatorTest.testIdempotentInputOperatorWithFailure() > with senario > {true, true, "one_to_many"} > ("multi-cluster: true, multi-partition: true, partition: "one_to_many") > throws following exception and the Collector Module didn't collect any data. > 2016-06-15 10:43:56,358 [1/Kafka > inputtesttopic0:KafkaSinglePortInputOperator] INFO stram.StramLocalCluster > log - container-6 msg: Stopped running due to an exception. > java.lang.RuntimeException: Couldn't replay the offset > at > org.apache.apex.malhar.kafka.KafkaConsumerWrapper.emitImmediately(KafkaConsumerWrapper.java:146) > at > org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.replay(AbstractKafkaInputOperator.java:261) > at > org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.beginWindow(AbstractKafkaInputOperator.java:250) > at com.datatorrent.stram.engine.InputNode.run(InputNode.java:122) > at > com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407) > Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: > Undefined offset with no reset policy for partition: testtopic0-1 > at > org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288) > at > org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895) -- This message was sent by Atlassian JIRA (v6.3.4#6332)