Hello All, While helping one of the Apex User, I found that current AbstractRabbitMQInputOperator can be made simpler to use . After investigation, I would like to suggest some improvements as below. If there are more improvements needed, please suggest and I will incorporate those suggestions to create Jira ticket.
In current code for AbstractRabbitMQInputOperator exchange, exchangeType are made NotNull which doesn't allow app to be launched without specifying these values. For an input Operator, we are actually trying to create queues and exchanges with specified values. But it leads to conflict in some scenarios when default exchange is used for the queue as well when queue type is transient. To consume from rabbitmq, operator need to use only QueueName, host and port of rabbitmq. Similar to KafkaInputOperator we can let the operator fail if QueueName is not specified and let developer correct an application or specified it from configuration. *Suggested Improvements:* 1) Drop requirements to specify exchange and its type . 2) We should not be attempting to create queue in Input Operator. For consumer only queue name is sufficient to start consuming data from queue. 3) Currently queue name is optional, we should make it mandatory instead of creating queue. I tried out following scenarios to test existing operator . *Scenarios with default exchange:* *1) Queue is already created as non-durable with default exchange* *Setup:* rabbitMQInputOperator.setQueueName("test_2"); rabbitMQInputOperator.setExchangeType("fanout"); rabbitMQInputOperator.setExchange(""); rabbitMQInputOperator.setHost("localhost"); rabbitMQInputOperator.setPort(5672); *Exception:* Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - operation not permitted on the default exchange, class-id=40, method-id=10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:398) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:128 *Reason:* Default exchange is specified in the code as “” . Though we are trying to consume from specified and already created queue, operator crashes as exchangeDeclare() call fails . *2)Queue is not created before launching an app * *Setup:* rabbitMQInputOperator.setQueueName("test"); rabbitMQInputOperator.setExchangeType("fanout"); rabbitMQInputOperator.setExchange(""); rabbitMQInputOperator.setHost("localhost"); rabbitMQInputOperator.setPort(5672); *Exception:* Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - operation not permitted on the default exchange, class-id=40, method-id=10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:398) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:128 *Reason:* Current operator failed to create default exchange . *Scnenarios with Custom Exchanges:* *1)Queue “test_2” created as non-durable with exchange “new_exchange” in rabbitmq* *Setup:* rabbitMQInputOperator.setQueueName("test_2"); rabbitMQInputOperator.setExchangeType("fanout"); rabbitMQInputOperator.setExchange("new_exchange"); rabbitMQInputOperator.setHost("localhost"); rabbitMQInputOperator.setPort(5672); *Exception:* Exception Caused: due to mismatch in param while declaring queue as “durable” Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange 'new_exchange' in vhost '/': received 'false' but current is 'true', class-id=40, method-id=10) *Reason:* Existing queue was transient ( non-durable ) while code tries to create durable queue with the same name. *2) When Queue and exchange are not created ( declared ) in rabbitmq* *Setup:* No queue and exchange are present in rabbitmq App with following configuration ends up creating durable queue “test_2” in new exchange “new_exchange” as “fanout” type and routing key as “”. rabbitMQInputOperator.setQueueName("test_2"); rabbitMQInputOperator.setExchangeType("fanout"); rabbitMQInputOperator.setExchange("new_exchange"); rabbitMQInputOperator.setHost("localhost"); rabbitMQInputOperator.setPort(5672); *Result:* Now external entities can pushed data to newly created exchange and queue *3) Queue and exchanges are already created rabbitmq with queue as durable and exchange and exchange Type as specified specified. * *Setup:* rabbitMQInputOperator.setQueueName("test"); rabbitMQInputOperator.setExchangeType("fanout"); rabbitMQInputOperator.setExchange("new_exchange"); rabbitMQInputOperator.setRoutingKey("test"); rabbitMQInputOperator.setHost("localhost"); rabbitMQInputOperator.setPort(5672); *Result:* Worked with no issues. But it demands that it durable queue has to be created with proper exchange and routing key *4) Queuename not specified in an app but exchange and exchangeType specified for Operator * *Setup:* rabbitMQInputOperator.setExchangeType("fanout"); rabbitMQInputOperator.setExchange("new_exchange"); rabbitMQInputOperator.setHost("localhost"); rabbitMQInputOperator.setRoutingKey("test"); rabbitMQInputOperator.setPort(5672); *Result:* Operator ended up creating queue with unique names such as “amq.gen-dHDsywLO-8eV8qZM4Q4T_w” which gets auto deleted when last consumer stops consuming from it. Thanks & Regards, Vikram