bright chen created APEXMALHAR-2120:
---------------------------------------

             Summary: Fix bugs on KafkaInputOperatorTest 
AbstractKafkaInputOperator
                 Key: APEXMALHAR-2120
                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2120
             Project: Apache Apex Malhar
          Issue Type: Bug
    Affects Versions: 3.4.0
            Reporter: bright chen
            Assignee: bright chen
             Fix For: 3.5.0


problems in Unit Test class: KafkaInputOperatorTest
- 'k' not initialized for each test case
- The assert was not correct
- The test case assume the END_TUPLE will be received at the end of normal 
tuples, but in fact the tuples could be out of order where support multiple 
cluster or partition
- The operator AbstractKafkaInputOperator implemented as "at least once", but 
the test case assume "exactly once"


problem of AbstractKafkaInputOperator:
For test case KafkaInputOperatorTest.testIdempotentInputOperatorWithFailure() 
with senario
{true, true, "one_to_many"}
("multi-cluster: true, multi-partition: true, partition: "one_to_many") throws 
following exception and the Collector Module didn't collect any data.
2016-06-15 10:43:56,358 [1/Kafka inputtesttopic0:KafkaSinglePortInputOperator] 
INFO stram.StramLocalCluster log - container-6 msg: Stopped running due to an 
exception. java.lang.RuntimeException: Couldn't replay the offset
at 
org.apache.apex.malhar.kafka.KafkaConsumerWrapper.emitImmediately(KafkaConsumerWrapper.java:146)
at 
org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.replay(AbstractKafkaInputOperator.java:261)
at 
org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.beginWindow(AbstractKafkaInputOperator.java:250)
at com.datatorrent.stram.engine.InputNode.run(InputNode.java:122)
at 
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407)
Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: 
Undefined offset with no reset policy for partition: testtopic0-1
at 
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to