+1 for the improvements This way the it will become easier for a new user to try out the operator as well.
Thanks, Shubham Pathak On Fri, Jun 9, 2017 at 2:01 PM, vikram patil <patilvik...@gmail.com> wrote: > Hello All, > > While helping one of the Apex User, I found that current > AbstractRabbitMQInputOperator can be made simpler to use . After > investigation, I would like to suggest some improvements as below. If there > are more improvements needed, please suggest and I will incorporate those > suggestions to create Jira ticket. > > In current code for AbstractRabbitMQInputOperator exchange, exchangeType > are made NotNull which doesn't allow app to be launched without specifying > these values. For an input Operator, we are actually trying to create > queues and exchanges with specified values. But it leads to conflict in > some scenarios when default exchange is used for the queue as well when > queue type is transient. To consume from rabbitmq, operator need to use > only QueueName, host and port of rabbitmq. Similar to KafkaInputOperator > we can let the operator fail if QueueName is not specified and let > developer correct an application or specified it from configuration. > > *Suggested Improvements:* > > 1) Drop requirements to specify exchange and its type . > 2) We should not be attempting to create queue in Input Operator. For > consumer only queue name is sufficient to start consuming data from queue. > 3) Currently queue name is optional, we should make it mandatory instead of > creating queue. > > I tried out following scenarios to test existing operator . > > *Scenarios with default exchange:* > *1) Queue is already created as non-durable with default exchange* > *Setup:* > rabbitMQInputOperator.setQueueName("test_2"); > rabbitMQInputOperator.setExchangeType("fanout"); > rabbitMQInputOperator.setExchange(""); > rabbitMQInputOperator.setHost("localhost"); > rabbitMQInputOperator.setPort(5672); > > *Exception:* > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > protocol method: #method<channel.close>(reply-code=403, > reply-text=ACCESS_REFUSED - operation not permitted on the default > exchange, class-id=40, method-id=10) > at com.rabbitmq.utility.ValueOrException.getValue( > ValueOrException.java:66) > at > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue( > BlockingValueOrException.java:36) > at > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation. > getReply(AMQChannel.java:398) > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244) > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:128 > > *Reason:* > Default exchange is specified in the code as “” . Though we are trying to > consume from specified and already created queue, operator crashes as > exchangeDeclare() call fails . > > *2)Queue is not created before launching an app * > > *Setup:* > rabbitMQInputOperator.setQueueName("test"); > rabbitMQInputOperator.setExchangeType("fanout"); > rabbitMQInputOperator.setExchange(""); > rabbitMQInputOperator.setHost("localhost"); > rabbitMQInputOperator.setPort(5672); > > *Exception:* > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > protocol method: #method<channel.close>(reply-code=403, > reply-text=ACCESS_REFUSED - operation not permitted on the default > exchange, class-id=40, method-id=10) > at com.rabbitmq.utility.ValueOrException.getValue( > ValueOrException.java:66) > at > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue( > BlockingValueOrException.java:36) > at > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation. > getReply(AMQChannel.java:398) > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244) > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:128 > > *Reason:* > Current operator failed to create default exchange . > > *Scnenarios with Custom Exchanges:* > > *1)Queue “test_2” created as non-durable with exchange “new_exchange” in > rabbitmq* > *Setup:* > rabbitMQInputOperator.setQueueName("test_2"); > rabbitMQInputOperator.setExchangeType("fanout"); > rabbitMQInputOperator.setExchange("new_exchange"); > rabbitMQInputOperator.setHost("localhost"); > rabbitMQInputOperator.setPort(5672); > > *Exception:* > Exception Caused: due to mismatch in param while declaring queue as > “durable” > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > protocol method: #method<channel.close>(reply-code=406, > reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange > 'new_exchange' in vhost '/': received 'false' but current is 'true', > class-id=40, method-id=10) > *Reason:* > Existing queue was transient ( non-durable ) while code tries to create > durable queue with the same name. > > *2) When Queue and exchange are not created ( declared ) in rabbitmq* > *Setup:* > No queue and exchange are present in rabbitmq > App with following configuration ends up creating durable queue “test_2” > in new exchange “new_exchange” as “fanout” type and routing key as “”. > > rabbitMQInputOperator.setQueueName("test_2"); > rabbitMQInputOperator.setExchangeType("fanout"); > rabbitMQInputOperator.setExchange("new_exchange"); > rabbitMQInputOperator.setHost("localhost"); > rabbitMQInputOperator.setPort(5672); > > *Result:* > Now external entities can pushed data to newly created exchange and queue > *3) Queue and exchanges are already created rabbitmq with queue as durable > and exchange and exchange Type as specified specified. * > > *Setup:* > > rabbitMQInputOperator.setQueueName("test"); > rabbitMQInputOperator.setExchangeType("fanout"); > rabbitMQInputOperator.setExchange("new_exchange"); > rabbitMQInputOperator.setRoutingKey("test"); > rabbitMQInputOperator.setHost("localhost"); > rabbitMQInputOperator.setPort(5672); > *Result:* > Worked with no issues. But it demands that it durable queue has to be > created with proper exchange and routing key > > *4) Queuename not specified in an app but exchange and exchangeType > specified for Operator * > > *Setup:* > rabbitMQInputOperator.setExchangeType("fanout"); > rabbitMQInputOperator.setExchange("new_exchange"); > rabbitMQInputOperator.setHost("localhost"); > rabbitMQInputOperator.setRoutingKey("test"); > rabbitMQInputOperator.setPort(5672); > > *Result:* > Operator ended up creating queue with unique names such as > “amq.gen-dHDsywLO-8eV8qZM4Q4T_w” which gets auto deleted when last > consumer stops consuming from it. > > > Thanks & Regards, > Vikram >