bright chen created APEXMALHAR-2120: ---------------------------------------
Summary: Fix bugs on KafkaInputOperatorTest AbstractKafkaInputOperator Key: APEXMALHAR-2120 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2120 Project: Apache Apex Malhar Issue Type: Bug Affects Versions: 3.4.0 Reporter: bright chen Assignee: bright chen Fix For: 3.5.0 problems in Unit Test class: KafkaInputOperatorTest - 'k' not initialized for each test case - The assert was not correct - The test case assume the END_TUPLE will be received at the end of normal tuples, but in fact the tuples could be out of order where support multiple cluster or partition - The operator AbstractKafkaInputOperator implemented as "at least once", but the test case assume "exactly once" problem of AbstractKafkaInputOperator: For test case KafkaInputOperatorTest.testIdempotentInputOperatorWithFailure() with senario {true, true, "one_to_many"} ("multi-cluster: true, multi-partition: true, partition: "one_to_many") throws following exception and the Collector Module didn't collect any data. 2016-06-15 10:43:56,358 [1/Kafka inputtesttopic0:KafkaSinglePortInputOperator] INFO stram.StramLocalCluster log - container-6 msg: Stopped running due to an exception. java.lang.RuntimeException: Couldn't replay the offset at org.apache.apex.malhar.kafka.KafkaConsumerWrapper.emitImmediately(KafkaConsumerWrapper.java:146) at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.replay(AbstractKafkaInputOperator.java:261) at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.beginWindow(AbstractKafkaInputOperator.java:250) at com.datatorrent.stram.engine.InputNode.run(InputNode.java:122) at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407) Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partition: testtopic0-1 at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288) at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167) at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895) -- This message was sent by Atlassian JIRA (v6.3.4#6332)