[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2120:
------------------------------------
    Description: 
problems in Unit Test class: KafkaInputOperatorTest
- 'k' not initialized for each test case
- The assert was not correct
- The test case assume the END_TUPLE will be received at the end of normal 
tuples, but in fact the tuples could be out of order where support multiple 
cluster or partition. Here is one example: 2016-06-22 08:54:12,827 [main] INFO  
kafka.KafkaInputOperatorTest testInputOperator - Number of received/expected 
tuples: 22/22, testName: testtopic16, tuples: 
[c1_2, c1_3, c1_6, c1_9, c1_10, c1_13, c1_14, c1_19, c1_20, END_TUPLE, 
END_TUPLE, c1_1, c1_4, c1_5, c1_7, c1_8, c1_11, c1_12, c1_15, c1_16, c1_17, 
c1_18]

- The operator AbstractKafkaInputOperator implemented as "at least once", but 
the test case assume "exactly once"

- RuntimeException: Couldn't replay the offset: see following log.

- RuntimeException: OneToOnePartitioner.assign(OneToOnePartitioner.java:52) or 
OneToManyPartitioner.assign(OneToManyPartitioner.java:57), that should due to 
be caused by NullPointerException. See following log

====================================================================================================
problem of AbstractKafkaInputOperator:
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
- RuntimeException: Couldn't replay the offset:
For test case KafkaInputOperatorTest.testIdempotentInputOperatorWithFailure() 
with senario
{true, true, "one_to_many"}
("multi-cluster: true, multi-partition: true, partition: "one_to_many") throws 
following exception and the Collector Module didn't collect any data.
2016-06-15 10:43:56,358 [1/Kafka inputtesttopic0:KafkaSinglePortInputOperator] 
INFO stram.StramLocalCluster log - container-6 msg: Stopped running due to an 
exception. java.lang.RuntimeException: Couldn't replay the offset
at 
org.apache.apex.malhar.kafka.KafkaConsumerWrapper.emitImmediately(KafkaConsumerWrapper.java:146)
at 
org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.replay(AbstractKafkaInputOperator.java:261)
at 
org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.beginWindow(AbstractKafkaInputOperator.java:250)
at com.datatorrent.stram.engine.InputNode.run(InputNode.java:122)
at 
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407)
Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: 
Undefined offset with no reset policy for partition: testtopic0-1
at 
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
- ConcurrentModificationException
2016-06-16 10:14:32,400 [1/Kafka inputtesttopic4:KafkaSinglePortInputOperator] 
ERROR engine.StreamingContainer run - Shutdown of operator 
OperatorDeployInfo[id=1,name=Kafka 
inputtesttopic4,type=INPUT,checkpoint={ffffffffffffffff, 0, 
0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka
 messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an 
exception.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255)
        at 
org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340)
        at 
org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184)
        at com.datatorrent.stram.engine.Node.deactivate(Node.java:646)
        at 
com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347)
        at 
com.datatorrent.stram.engine.StreamingContainer.access$500(StreamingContainer.java:130)
        at 
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1438)
2016-06-16 10:14:32,400 [2/Kafka inputtesttopic4:KafkaSinglePortInputOperator] 
ERROR engine.StreamingContainer run - Shutdown of operator 
OperatorDeployInfo[id=2,name=Kafka 
inputtesttopic4,type=INPUT,checkpoint={ffffffffffffffff, 0, 
0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka
 messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an 
exception.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255)
        at 
org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340)
        at 
org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184)
        at com.datatorrent.stram.engine.Node.deactivate(Node.java:646)
        at 
com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347)
        at 
com.datatorrent.stram.engine.StreamingContainer.access$500(StreamingContainer.java:130)
        at 
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1438)

---------------------------------------------------------------------------------------------------------
Tests run: 24, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 154.711 sec 
<<< FAILURE!
testInputOperatorWithFailure[multi-cluster: true, multi-partition: true, 
partition: one_to_one](org.apache.apex.malhar.kafka.KafkaInputOperatorTest)  
Time elapsed: 0.457 sec  <<< ERROR!
java.lang.RuntimeException: Error creating local cluster
        at 
org.apache.apex.malhar.kafka.OneToOnePartitioner.assign(OneToOnePartitioner.java:52)
        at 
org.apache.apex.malhar.kafka.AbstractKafkaPartitioner.definePartitions(AbstractKafkaPartitioner.java:110)
        at 
org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.definePartitions(AbstractKafkaInputOperator.java:356)
        at 
com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:752)
        at 
com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676)
        at 
com.datatorrent.stram.plan.physical.PhysicalPlan.<init>(PhysicalPlan.java:378)
        at 
com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:418)
        at 
com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:406)
        at 
com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:299)
        at 
com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76)
        at 
org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperator(KafkaInputOperatorTest.java:338)
        at 
org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperatorWithFailure(KafkaInputOperatorTest.java:285)

testInputOperatorWithFailure[multi-cluster: false, multi-partition: false, 
partition: one_to_many](org.apache.apex.malhar.kafka.KafkaInputOperatorTest)  
Time elapsed: 0.333 sec  <<< ERROR!
java.lang.RuntimeException: Error creating local cluster
        at 
org.apache.apex.malhar.kafka.OneToManyPartitioner.assign(OneToManyPartitioner.java:57)
        at 
org.apache.apex.malhar.kafka.AbstractKafkaPartitioner.definePartitions(AbstractKafkaPartitioner.java:110)
        at 
org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.definePartitions(AbstractKafkaInputOperator.java:356)
        at 
com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:752)
        at 
com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676)
        at 
com.datatorrent.stram.plan.physical.PhysicalPlan.<init>(PhysicalPlan.java:378)
        at 
com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:418)
        at 
com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:406)
        at 
com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:299)
        at 
com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76)
        at 
org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperator(KafkaInputOperatorTest.java:338)
        at 
org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperatorWithFailure(KafkaInputOperatorTest.java:285)

  was:
problems in Unit Test class: KafkaInputOperatorTest
- 'k' not initialized for each test case
- The assert was not correct
- The test case assume the END_TUPLE will be received at the end of normal 
tuples, but in fact the tuples could be out of order where support multiple 
cluster or partition
- The operator AbstractKafkaInputOperator implemented as "at least once", but 
the test case assume "exactly once"

====================================================================================================
problem of AbstractKafkaInputOperator:
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
- RuntimeException: Couldn't replay the offset:
For test case KafkaInputOperatorTest.testIdempotentInputOperatorWithFailure() 
with senario
{true, true, "one_to_many"}
("multi-cluster: true, multi-partition: true, partition: "one_to_many") throws 
following exception and the Collector Module didn't collect any data.
2016-06-15 10:43:56,358 [1/Kafka inputtesttopic0:KafkaSinglePortInputOperator] 
INFO stram.StramLocalCluster log - container-6 msg: Stopped running due to an 
exception. java.lang.RuntimeException: Couldn't replay the offset
at 
org.apache.apex.malhar.kafka.KafkaConsumerWrapper.emitImmediately(KafkaConsumerWrapper.java:146)
at 
org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.replay(AbstractKafkaInputOperator.java:261)
at 
org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.beginWindow(AbstractKafkaInputOperator.java:250)
at com.datatorrent.stram.engine.InputNode.run(InputNode.java:122)
at 
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407)
Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: 
Undefined offset with no reset policy for partition: testtopic0-1
at 
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
- ConcurrentModificationException
2016-06-16 10:14:32,400 [1/Kafka inputtesttopic4:KafkaSinglePortInputOperator] 
ERROR engine.StreamingContainer run - Shutdown of operator 
OperatorDeployInfo[id=1,name=Kafka 
inputtesttopic4,type=INPUT,checkpoint={ffffffffffffffff, 0, 
0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka
 messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an 
exception.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255)
        at 
org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340)
        at 
org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184)
        at com.datatorrent.stram.engine.Node.deactivate(Node.java:646)
        at 
com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347)
        at 
com.datatorrent.stram.engine.StreamingContainer.access$500(StreamingContainer.java:130)
        at 
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1438)
2016-06-16 10:14:32,400 [2/Kafka inputtesttopic4:KafkaSinglePortInputOperator] 
ERROR engine.StreamingContainer run - Shutdown of operator 
OperatorDeployInfo[id=2,name=Kafka 
inputtesttopic4,type=INPUT,checkpoint={ffffffffffffffff, 0, 
0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka
 messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an 
exception.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255)
        at 
org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340)
        at 
org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184)
        at com.datatorrent.stram.engine.Node.deactivate(Node.java:646)
        at 
com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347)
        at 
com.datatorrent.stram.engine.StreamingContainer.access$500(StreamingContainer.java:130)
        at 
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1438)


> Fix bugs on KafkaInputOperatorTest and AbstractKafkaInputOperator
> -----------------------------------------------------------------
>
>                 Key: APEXMALHAR-2120
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2120
>             Project: Apache Apex Malhar
>          Issue Type: Bug
>    Affects Versions: 3.4.0
>            Reporter: bright chen
>            Assignee: bright chen
>             Fix For: 3.5.0
>
>
> problems in Unit Test class: KafkaInputOperatorTest
> - 'k' not initialized for each test case
> - The assert was not correct
> - The test case assume the END_TUPLE will be received at the end of normal 
> tuples, but in fact the tuples could be out of order where support multiple 
> cluster or partition. Here is one example: 2016-06-22 08:54:12,827 [main] 
> INFO  kafka.KafkaInputOperatorTest testInputOperator - Number of 
> received/expected tuples: 22/22, testName: testtopic16, tuples: 
> [c1_2, c1_3, c1_6, c1_9, c1_10, c1_13, c1_14, c1_19, c1_20, END_TUPLE, 
> END_TUPLE, c1_1, c1_4, c1_5, c1_7, c1_8, c1_11, c1_12, c1_15, c1_16, c1_17, 
> c1_18]
> - The operator AbstractKafkaInputOperator implemented as "at least once", but 
> the test case assume "exactly once"
> - RuntimeException: Couldn't replay the offset: see following log.
> - RuntimeException: OneToOnePartitioner.assign(OneToOnePartitioner.java:52) 
> or OneToManyPartitioner.assign(OneToManyPartitioner.java:57), that should due 
> to be caused by NullPointerException. See following log
> ====================================================================================================
> problem of AbstractKafkaInputOperator:
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> - RuntimeException: Couldn't replay the offset:
> For test case KafkaInputOperatorTest.testIdempotentInputOperatorWithFailure() 
> with senario
> {true, true, "one_to_many"}
> ("multi-cluster: true, multi-partition: true, partition: "one_to_many") 
> throws following exception and the Collector Module didn't collect any data.
> 2016-06-15 10:43:56,358 [1/Kafka 
> inputtesttopic0:KafkaSinglePortInputOperator] INFO stram.StramLocalCluster 
> log - container-6 msg: Stopped running due to an exception. 
> java.lang.RuntimeException: Couldn't replay the offset
> at 
> org.apache.apex.malhar.kafka.KafkaConsumerWrapper.emitImmediately(KafkaConsumerWrapper.java:146)
> at 
> org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.replay(AbstractKafkaInputOperator.java:261)
> at 
> org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.beginWindow(AbstractKafkaInputOperator.java:250)
> at com.datatorrent.stram.engine.InputNode.run(InputNode.java:122)
> at 
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407)
> Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: 
> Undefined offset with no reset policy for partition: testtopic0-1
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> - ConcurrentModificationException
> 2016-06-16 10:14:32,400 [1/Kafka 
> inputtesttopic4:KafkaSinglePortInputOperator] ERROR engine.StreamingContainer 
> run - Shutdown of operator OperatorDeployInfo[id=1,name=Kafka 
> inputtesttopic4,type=INPUT,checkpoint={ffffffffffffffff, 0, 
> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka
>  messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an 
> exception.
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255)
>       at 
> org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340)
>       at 
> org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184)
>       at com.datatorrent.stram.engine.Node.deactivate(Node.java:646)
>       at 
> com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347)
>       at 
> com.datatorrent.stram.engine.StreamingContainer.access$500(StreamingContainer.java:130)
>       at 
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1438)
> 2016-06-16 10:14:32,400 [2/Kafka 
> inputtesttopic4:KafkaSinglePortInputOperator] ERROR engine.StreamingContainer 
> run - Shutdown of operator OperatorDeployInfo[id=2,name=Kafka 
> inputtesttopic4,type=INPUT,checkpoint={ffffffffffffffff, 0, 
> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka
>  messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an 
> exception.
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255)
>       at 
> org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340)
>       at 
> org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184)
>       at com.datatorrent.stram.engine.Node.deactivate(Node.java:646)
>       at 
> com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347)
>       at 
> com.datatorrent.stram.engine.StreamingContainer.access$500(StreamingContainer.java:130)
>       at 
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1438)
> ---------------------------------------------------------------------------------------------------------
> Tests run: 24, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 154.711 sec 
> <<< FAILURE!
> testInputOperatorWithFailure[multi-cluster: true, multi-partition: true, 
> partition: one_to_one](org.apache.apex.malhar.kafka.KafkaInputOperatorTest)  
> Time elapsed: 0.457 sec  <<< ERROR!
> java.lang.RuntimeException: Error creating local cluster
>       at 
> org.apache.apex.malhar.kafka.OneToOnePartitioner.assign(OneToOnePartitioner.java:52)
>       at 
> org.apache.apex.malhar.kafka.AbstractKafkaPartitioner.definePartitions(AbstractKafkaPartitioner.java:110)
>       at 
> org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.definePartitions(AbstractKafkaInputOperator.java:356)
>       at 
> com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:752)
>       at 
> com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676)
>       at 
> com.datatorrent.stram.plan.physical.PhysicalPlan.<init>(PhysicalPlan.java:378)
>       at 
> com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:418)
>       at 
> com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:406)
>       at 
> com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:299)
>       at 
> com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76)
>       at 
> org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperator(KafkaInputOperatorTest.java:338)
>       at 
> org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperatorWithFailure(KafkaInputOperatorTest.java:285)
> testInputOperatorWithFailure[multi-cluster: false, multi-partition: false, 
> partition: one_to_many](org.apache.apex.malhar.kafka.KafkaInputOperatorTest)  
> Time elapsed: 0.333 sec  <<< ERROR!
> java.lang.RuntimeException: Error creating local cluster
>       at 
> org.apache.apex.malhar.kafka.OneToManyPartitioner.assign(OneToManyPartitioner.java:57)
>       at 
> org.apache.apex.malhar.kafka.AbstractKafkaPartitioner.definePartitions(AbstractKafkaPartitioner.java:110)
>       at 
> org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.definePartitions(AbstractKafkaInputOperator.java:356)
>       at 
> com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:752)
>       at 
> com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676)
>       at 
> com.datatorrent.stram.plan.physical.PhysicalPlan.<init>(PhysicalPlan.java:378)
>       at 
> com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:418)
>       at 
> com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:406)
>       at 
> com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:299)
>       at 
> com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76)
>       at 
> org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperator(KafkaInputOperatorTest.java:338)
>       at 
> org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperatorWithFailure(KafkaInputOperatorTest.java:285)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to