+1 - The queue_name, host, port should be sufficient to start reading the messages.
Regards, Mohit On Fri, Jun 9, 2017 at 5:47 PM, Shubham Pathak <shub...@datatorrent.com> wrote: > +1 for the improvements > This way the it will become easier for a new user to try out the operator > as well. > > Thanks, > Shubham Pathak > > On Fri, Jun 9, 2017 at 2:01 PM, vikram patil <patilvik...@gmail.com> > wrote: > > > Hello All, > > > > While helping one of the Apex User, I found that current > > AbstractRabbitMQInputOperator can be made simpler to use . After > > investigation, I would like to suggest some improvements as below. If > there > > are more improvements needed, please suggest and I will incorporate those > > suggestions to create Jira ticket. > > > > In current code for AbstractRabbitMQInputOperator exchange, exchangeType > > are made NotNull which doesn't allow app to be launched without > specifying > > these values. For an input Operator, we are actually trying to create > > queues and exchanges with specified values. But it leads to conflict in > > some scenarios when default exchange is used for the queue as well when > > queue type is transient. To consume from rabbitmq, operator need to use > > only QueueName, host and port of rabbitmq. Similar to KafkaInputOperator > > we can let the operator fail if QueueName is not specified and let > > developer correct an application or specified it from configuration. > > > > *Suggested Improvements:* > > > > 1) Drop requirements to specify exchange and its type . > > 2) We should not be attempting to create queue in Input Operator. For > > consumer only queue name is sufficient to start consuming data from > queue. > > 3) Currently queue name is optional, we should make it mandatory instead > of > > creating queue. > > > > I tried out following scenarios to test existing operator . > > > > *Scenarios with default exchange:* > > *1) Queue is already created as non-durable with default exchange* > > *Setup:* > > rabbitMQInputOperator.setQueueName("test_2"); > > rabbitMQInputOperator.setExchangeType("fanout"); > > rabbitMQInputOperator.setExchange(""); > > rabbitMQInputOperator.setHost("localhost"); > > rabbitMQInputOperator.setPort(5672); > > > > *Exception:* > > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > > protocol method: #method<channel.close>(reply-code=403, > > reply-text=ACCESS_REFUSED - operation not permitted on the default > > exchange, class-id=40, method-id=10) > > at com.rabbitmq.utility.ValueOrException.getValue( > > ValueOrException.java:66) > > at > > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue( > > BlockingValueOrException.java:36) > > at > > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation. > > getReply(AMQChannel.java:398) > > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244) > > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc( > AMQChannel.java:128 > > > > *Reason:* > > Default exchange is specified in the code as “” . Though we are trying to > > consume from specified and already created queue, operator crashes as > > exchangeDeclare() call fails . > > > > *2)Queue is not created before launching an app * > > > > *Setup:* > > rabbitMQInputOperator.setQueueName("test"); > > rabbitMQInputOperator.setExchangeType("fanout"); > > rabbitMQInputOperator.setExchange(""); > > rabbitMQInputOperator.setHost("localhost"); > > rabbitMQInputOperator.setPort(5672); > > > > *Exception:* > > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > > protocol method: #method<channel.close>(reply-code=403, > > reply-text=ACCESS_REFUSED - operation not permitted on the default > > exchange, class-id=40, method-id=10) > > at com.rabbitmq.utility.ValueOrException.getValue( > > ValueOrException.java:66) > > at > > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue( > > BlockingValueOrException.java:36) > > at > > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation. > > getReply(AMQChannel.java:398) > > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244) > > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc( > AMQChannel.java:128 > > > > *Reason:* > > Current operator failed to create default exchange . > > > > *Scnenarios with Custom Exchanges:* > > > > *1)Queue “test_2” created as non-durable with exchange “new_exchange” in > > rabbitmq* > > *Setup:* > > rabbitMQInputOperator.setQueueName("test_2"); > > rabbitMQInputOperator.setExchangeType("fanout"); > > rabbitMQInputOperator.setExchange("new_exchange"); > > rabbitMQInputOperator.setHost("localhost"); > > rabbitMQInputOperator.setPort(5672); > > > > *Exception:* > > Exception Caused: due to mismatch in param while declaring queue as > > “durable” > > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > > protocol method: #method<channel.close>(reply-code=406, > > reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange > > 'new_exchange' in vhost '/': received 'false' but current is 'true', > > class-id=40, method-id=10) > > *Reason:* > > Existing queue was transient ( non-durable ) while code tries to create > > durable queue with the same name. > > > > *2) When Queue and exchange are not created ( declared ) in rabbitmq* > > *Setup:* > > No queue and exchange are present in rabbitmq > > App with following configuration ends up creating durable queue “test_2” > > in new exchange “new_exchange” as “fanout” type and routing key as “”. > > > > rabbitMQInputOperator.setQueueName("test_2"); > > rabbitMQInputOperator.setExchangeType("fanout"); > > rabbitMQInputOperator.setExchange("new_exchange"); > > rabbitMQInputOperator.setHost("localhost"); > > rabbitMQInputOperator.setPort(5672); > > > > *Result:* > > Now external entities can pushed data to newly created exchange and queue > > *3) Queue and exchanges are already created rabbitmq with queue as > durable > > and exchange and exchange Type as specified specified. * > > > > *Setup:* > > > > rabbitMQInputOperator.setQueueName("test"); > > rabbitMQInputOperator.setExchangeType("fanout"); > > rabbitMQInputOperator.setExchange("new_exchange"); > > rabbitMQInputOperator.setRoutingKey("test"); > > rabbitMQInputOperator.setHost("localhost"); > > rabbitMQInputOperator.setPort(5672); > > *Result:* > > Worked with no issues. But it demands that it durable queue has to be > > created with proper exchange and routing key > > > > *4) Queuename not specified in an app but exchange and exchangeType > > specified for Operator * > > > > *Setup:* > > rabbitMQInputOperator.setExchangeType("fanout"); > > rabbitMQInputOperator.setExchange("new_exchange"); > > rabbitMQInputOperator.setHost("localhost"); > > rabbitMQInputOperator.setRoutingKey("test"); > > rabbitMQInputOperator.setPort(5672); > > > > *Result:* > > Operator ended up creating queue with unique names such as > > “amq.gen-dHDsywLO-8eV8qZM4Q4T_w” which gets auto deleted when last > > consumer stops consuming from it. > > > > > > Thanks & Regards, > > Vikram > > > -- Regards, ___________________________________________________ *Mohit Jotwani* Product Manager E: mo...@datatorrent.com | M: +91 97699 62740 www.datatorrent.com | apex.apache.org