[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333107#comment-15333107
 ] 

ASF GitHub Bot commented on APEXMALHAR-2120:
--------------------------------------------

GitHub user brightchen reopened a pull request:

    https://github.com/apache/apex-malhar/pull/320

    APEXMALHAR-2120 #resolve #comment solve problems of KafkaInputOperato…

    …rTest and AbstractKafkaInputOperator

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/brightchen/apex-malhar APEXMALHAR-2120

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/apex-malhar/pull/320.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #320
    
----
commit 6f99fb20da7214f493b442ce4ba3f447ec51cd13
Author: brightchen <bri...@datatorrent.com>
Date:   2016-06-14T23:30:17Z

    APEXMALHAR-2120 #resolve #comment solve problems of KafkaInputOperatorTest 
and AbstractKafkaInputOperator

----


> Fix bugs on KafkaInputOperatorTest AbstractKafkaInputOperator
> -------------------------------------------------------------
>
>                 Key: APEXMALHAR-2120
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2120
>             Project: Apache Apex Malhar
>          Issue Type: Bug
>    Affects Versions: 3.4.0
>            Reporter: bright chen
>            Assignee: bright chen
>             Fix For: 3.5.0
>
>
> problems in Unit Test class: KafkaInputOperatorTest
> - 'k' not initialized for each test case
> - The assert was not correct
> - The test case assume the END_TUPLE will be received at the end of normal 
> tuples, but in fact the tuples could be out of order where support multiple 
> cluster or partition
> - The operator AbstractKafkaInputOperator implemented as "at least once", but 
> the test case assume "exactly once"
> problem of AbstractKafkaInputOperator:
> For test case KafkaInputOperatorTest.testIdempotentInputOperatorWithFailure() 
> with senario
> {true, true, "one_to_many"}
> ("multi-cluster: true, multi-partition: true, partition: "one_to_many") 
> throws following exception and the Collector Module didn't collect any data.
> 2016-06-15 10:43:56,358 [1/Kafka 
> inputtesttopic0:KafkaSinglePortInputOperator] INFO stram.StramLocalCluster 
> log - container-6 msg: Stopped running due to an exception. 
> java.lang.RuntimeException: Couldn't replay the offset
> at 
> org.apache.apex.malhar.kafka.KafkaConsumerWrapper.emitImmediately(KafkaConsumerWrapper.java:146)
> at 
> org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.replay(AbstractKafkaInputOperator.java:261)
> at 
> org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.beginWindow(AbstractKafkaInputOperator.java:250)
> at com.datatorrent.stram.engine.InputNode.run(InputNode.java:122)
> at 
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407)
> Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: 
> Undefined offset with no reset policy for partition: testtopic0-1
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to