[ https://issues.apache.org/jira/browse/APEXMALHAR-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
bright chen updated APEXMALHAR-2120: ------------------------------------ Description: problems in Unit Test class: KafkaInputOperatorTest - 'k' not initialized for each test case - The assert was not correct - The test case assume the END_TUPLE will be received at the end of normal tuples, but in fact the tuples could be out of order where support multiple cluster or partition. Here is one example: 2016-06-22 08:54:12,827 [main] INFO kafka.KafkaInputOperatorTest testInputOperator - Number of received/expected tuples: 22/22, testName: testtopic16, tuples: [c1_2, c1_3, c1_6, c1_9, c1_10, c1_13, c1_14, c1_19, c1_20, END_TUPLE, END_TUPLE, c1_1, c1_4, c1_5, c1_7, c1_8, c1_11, c1_12, c1_15, c1_16, c1_17, c1_18] - The operator AbstractKafkaInputOperator implemented as "at least once", but the test case assume "exactly once" - RuntimeException: Couldn't replay the offset: see following log. - RuntimeException: OneToOnePartitioner.assign(OneToOnePartitioner.java:52) or OneToManyPartitioner.assign(OneToManyPartitioner.java:57), that should due to be caused by NullPointerException. See following log ==================================================================================================== problem of AbstractKafkaInputOperator: -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - RuntimeException: Couldn't replay the offset: For test case KafkaInputOperatorTest.testIdempotentInputOperatorWithFailure() with senario {true, true, "one_to_many"} ("multi-cluster: true, multi-partition: true, partition: "one_to_many") throws following exception and the Collector Module didn't collect any data. 2016-06-15 10:43:56,358 [1/Kafka inputtesttopic0:KafkaSinglePortInputOperator] INFO stram.StramLocalCluster log - container-6 msg: Stopped running due to an exception. java.lang.RuntimeException: Couldn't replay the offset at org.apache.apex.malhar.kafka.KafkaConsumerWrapper.emitImmediately(KafkaConsumerWrapper.java:146) at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.replay(AbstractKafkaInputOperator.java:261) at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.beginWindow(AbstractKafkaInputOperator.java:250) at com.datatorrent.stram.engine.InputNode.run(InputNode.java:122) at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407) Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partition: testtopic0-1 at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288) at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167) at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895) ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ - ConcurrentModificationException 2016-06-16 10:14:32,400 [1/Kafka inputtesttopic4:KafkaSinglePortInputOperator] ERROR engine.StreamingContainer run - Shutdown of operator OperatorDeployInfo[id=1,name=Kafka inputtesttopic4,type=INPUT,checkpoint={ffffffffffffffff, 0, 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an exception. java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255) at org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340) at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184) at com.datatorrent.stram.engine.Node.deactivate(Node.java:646) at com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347) at com.datatorrent.stram.engine.StreamingContainer.access$500(StreamingContainer.java:130) at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1438) 2016-06-16 10:14:32,400 [2/Kafka inputtesttopic4:KafkaSinglePortInputOperator] ERROR engine.StreamingContainer run - Shutdown of operator OperatorDeployInfo[id=2,name=Kafka inputtesttopic4,type=INPUT,checkpoint={ffffffffffffffff, 0, 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an exception. java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255) at org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340) at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184) at com.datatorrent.stram.engine.Node.deactivate(Node.java:646) at com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347) at com.datatorrent.stram.engine.StreamingContainer.access$500(StreamingContainer.java:130) at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1438) --------------------------------------------------------------------------------------------------------- Tests run: 24, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 154.711 sec <<< FAILURE! testInputOperatorWithFailure[multi-cluster: true, multi-partition: true, partition: one_to_one](org.apache.apex.malhar.kafka.KafkaInputOperatorTest) Time elapsed: 0.457 sec <<< ERROR! java.lang.RuntimeException: Error creating local cluster at org.apache.apex.malhar.kafka.OneToOnePartitioner.assign(OneToOnePartitioner.java:52) at org.apache.apex.malhar.kafka.AbstractKafkaPartitioner.definePartitions(AbstractKafkaPartitioner.java:110) at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.definePartitions(AbstractKafkaInputOperator.java:356) at com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:752) at com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676) at com.datatorrent.stram.plan.physical.PhysicalPlan.<init>(PhysicalPlan.java:378) at com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:418) at com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:406) at com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:299) at com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76) at org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperator(KafkaInputOperatorTest.java:338) at org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperatorWithFailure(KafkaInputOperatorTest.java:285) testInputOperatorWithFailure[multi-cluster: false, multi-partition: false, partition: one_to_many](org.apache.apex.malhar.kafka.KafkaInputOperatorTest) Time elapsed: 0.333 sec <<< ERROR! java.lang.RuntimeException: Error creating local cluster at org.apache.apex.malhar.kafka.OneToManyPartitioner.assign(OneToManyPartitioner.java:57) at org.apache.apex.malhar.kafka.AbstractKafkaPartitioner.definePartitions(AbstractKafkaPartitioner.java:110) at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.definePartitions(AbstractKafkaInputOperator.java:356) at com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:752) at com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676) at com.datatorrent.stram.plan.physical.PhysicalPlan.<init>(PhysicalPlan.java:378) at com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:418) at com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:406) at com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:299) at com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76) at org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperator(KafkaInputOperatorTest.java:338) at org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperatorWithFailure(KafkaInputOperatorTest.java:285) was: problems in Unit Test class: KafkaInputOperatorTest - 'k' not initialized for each test case - The assert was not correct - The test case assume the END_TUPLE will be received at the end of normal tuples, but in fact the tuples could be out of order where support multiple cluster or partition - The operator AbstractKafkaInputOperator implemented as "at least once", but the test case assume "exactly once" ==================================================================================================== problem of AbstractKafkaInputOperator: -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - RuntimeException: Couldn't replay the offset: For test case KafkaInputOperatorTest.testIdempotentInputOperatorWithFailure() with senario {true, true, "one_to_many"} ("multi-cluster: true, multi-partition: true, partition: "one_to_many") throws following exception and the Collector Module didn't collect any data. 2016-06-15 10:43:56,358 [1/Kafka inputtesttopic0:KafkaSinglePortInputOperator] INFO stram.StramLocalCluster log - container-6 msg: Stopped running due to an exception. java.lang.RuntimeException: Couldn't replay the offset at org.apache.apex.malhar.kafka.KafkaConsumerWrapper.emitImmediately(KafkaConsumerWrapper.java:146) at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.replay(AbstractKafkaInputOperator.java:261) at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.beginWindow(AbstractKafkaInputOperator.java:250) at com.datatorrent.stram.engine.InputNode.run(InputNode.java:122) at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407) Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partition: testtopic0-1 at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288) at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167) at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895) ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ - ConcurrentModificationException 2016-06-16 10:14:32,400 [1/Kafka inputtesttopic4:KafkaSinglePortInputOperator] ERROR engine.StreamingContainer run - Shutdown of operator OperatorDeployInfo[id=1,name=Kafka inputtesttopic4,type=INPUT,checkpoint={ffffffffffffffff, 0, 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an exception. java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255) at org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340) at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184) at com.datatorrent.stram.engine.Node.deactivate(Node.java:646) at com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347) at com.datatorrent.stram.engine.StreamingContainer.access$500(StreamingContainer.java:130) at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1438) 2016-06-16 10:14:32,400 [2/Kafka inputtesttopic4:KafkaSinglePortInputOperator] ERROR engine.StreamingContainer run - Shutdown of operator OperatorDeployInfo[id=2,name=Kafka inputtesttopic4,type=INPUT,checkpoint={ffffffffffffffff, 0, 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an exception. java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255) at org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340) at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184) at com.datatorrent.stram.engine.Node.deactivate(Node.java:646) at com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347) at com.datatorrent.stram.engine.StreamingContainer.access$500(StreamingContainer.java:130) at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1438) > Fix bugs on KafkaInputOperatorTest and AbstractKafkaInputOperator > ----------------------------------------------------------------- > > Key: APEXMALHAR-2120 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2120 > Project: Apache Apex Malhar > Issue Type: Bug > Affects Versions: 3.4.0 > Reporter: bright chen > Assignee: bright chen > Fix For: 3.5.0 > > > problems in Unit Test class: KafkaInputOperatorTest > - 'k' not initialized for each test case > - The assert was not correct > - The test case assume the END_TUPLE will be received at the end of normal > tuples, but in fact the tuples could be out of order where support multiple > cluster or partition. Here is one example: 2016-06-22 08:54:12,827 [main] > INFO kafka.KafkaInputOperatorTest testInputOperator - Number of > received/expected tuples: 22/22, testName: testtopic16, tuples: > [c1_2, c1_3, c1_6, c1_9, c1_10, c1_13, c1_14, c1_19, c1_20, END_TUPLE, > END_TUPLE, c1_1, c1_4, c1_5, c1_7, c1_8, c1_11, c1_12, c1_15, c1_16, c1_17, > c1_18] > - The operator AbstractKafkaInputOperator implemented as "at least once", but > the test case assume "exactly once" > - RuntimeException: Couldn't replay the offset: see following log. > - RuntimeException: OneToOnePartitioner.assign(OneToOnePartitioner.java:52) > or OneToManyPartitioner.assign(OneToManyPartitioner.java:57), that should due > to be caused by NullPointerException. See following log > ==================================================================================================== > problem of AbstractKafkaInputOperator: > -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- > - RuntimeException: Couldn't replay the offset: > For test case KafkaInputOperatorTest.testIdempotentInputOperatorWithFailure() > with senario > {true, true, "one_to_many"} > ("multi-cluster: true, multi-partition: true, partition: "one_to_many") > throws following exception and the Collector Module didn't collect any data. > 2016-06-15 10:43:56,358 [1/Kafka > inputtesttopic0:KafkaSinglePortInputOperator] INFO stram.StramLocalCluster > log - container-6 msg: Stopped running due to an exception. > java.lang.RuntimeException: Couldn't replay the offset > at > org.apache.apex.malhar.kafka.KafkaConsumerWrapper.emitImmediately(KafkaConsumerWrapper.java:146) > at > org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.replay(AbstractKafkaInputOperator.java:261) > at > org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.beginWindow(AbstractKafkaInputOperator.java:250) > at com.datatorrent.stram.engine.InputNode.run(InputNode.java:122) > at > com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407) > Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: > Undefined offset with no reset policy for partition: testtopic0-1 > at > org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288) > at > org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895) > ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ > - ConcurrentModificationException > 2016-06-16 10:14:32,400 [1/Kafka > inputtesttopic4:KafkaSinglePortInputOperator] ERROR engine.StreamingContainer > run - Shutdown of operator OperatorDeployInfo[id=1,name=Kafka > inputtesttopic4,type=INPUT,checkpoint={ffffffffffffffff, 0, > 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka > messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an > exception. > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324) > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255) > at > org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340) > at > org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184) > at com.datatorrent.stram.engine.Node.deactivate(Node.java:646) > at > com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347) > at > com.datatorrent.stram.engine.StreamingContainer.access$500(StreamingContainer.java:130) > at > com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1438) > 2016-06-16 10:14:32,400 [2/Kafka > inputtesttopic4:KafkaSinglePortInputOperator] ERROR engine.StreamingContainer > run - Shutdown of operator OperatorDeployInfo[id=2,name=Kafka > inputtesttopic4,type=INPUT,checkpoint={ffffffffffffffff, 0, > 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka > messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an > exception. > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324) > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255) > at > org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340) > at > org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184) > at com.datatorrent.stram.engine.Node.deactivate(Node.java:646) > at > com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347) > at > com.datatorrent.stram.engine.StreamingContainer.access$500(StreamingContainer.java:130) > at > com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1438) > --------------------------------------------------------------------------------------------------------- > Tests run: 24, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 154.711 sec > <<< FAILURE! > testInputOperatorWithFailure[multi-cluster: true, multi-partition: true, > partition: one_to_one](org.apache.apex.malhar.kafka.KafkaInputOperatorTest) > Time elapsed: 0.457 sec <<< ERROR! > java.lang.RuntimeException: Error creating local cluster > at > org.apache.apex.malhar.kafka.OneToOnePartitioner.assign(OneToOnePartitioner.java:52) > at > org.apache.apex.malhar.kafka.AbstractKafkaPartitioner.definePartitions(AbstractKafkaPartitioner.java:110) > at > org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.definePartitions(AbstractKafkaInputOperator.java:356) > at > com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:752) > at > com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676) > at > com.datatorrent.stram.plan.physical.PhysicalPlan.<init>(PhysicalPlan.java:378) > at > com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:418) > at > com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:406) > at > com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:299) > at > com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76) > at > org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperator(KafkaInputOperatorTest.java:338) > at > org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperatorWithFailure(KafkaInputOperatorTest.java:285) > testInputOperatorWithFailure[multi-cluster: false, multi-partition: false, > partition: one_to_many](org.apache.apex.malhar.kafka.KafkaInputOperatorTest) > Time elapsed: 0.333 sec <<< ERROR! > java.lang.RuntimeException: Error creating local cluster > at > org.apache.apex.malhar.kafka.OneToManyPartitioner.assign(OneToManyPartitioner.java:57) > at > org.apache.apex.malhar.kafka.AbstractKafkaPartitioner.definePartitions(AbstractKafkaPartitioner.java:110) > at > org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.definePartitions(AbstractKafkaInputOperator.java:356) > at > com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:752) > at > com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676) > at > com.datatorrent.stram.plan.physical.PhysicalPlan.<init>(PhysicalPlan.java:378) > at > com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:418) > at > com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:406) > at > com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:299) > at > com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76) > at > org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperator(KafkaInputOperatorTest.java:338) > at > org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperatorWithFailure(KafkaInputOperatorTest.java:285) -- This message was sent by Atlassian JIRA (v6.3.4#6332)