Hello All,

While helping one of the Apex User, I found that current
AbstractRabbitMQInputOperator can be made simpler to use . After
investigation, I would like to suggest some improvements as below. If there
are more improvements needed, please suggest and I will incorporate those
suggestions to create Jira ticket.

In current code for AbstractRabbitMQInputOperator exchange, exchangeType
are made NotNull which doesn't allow app to be launched without specifying
these values. For an input Operator, we are actually trying to create
queues and exchanges with specified values. But it leads to conflict in
some scenarios when default exchange is used for the queue as well when
queue type is transient. To consume from rabbitmq, operator need to use
only QueueName, host and port of rabbitmq.  Similar to KafkaInputOperator
we can let the operator fail if QueueName is not specified and let
developer correct an application or specified it from configuration.

*Suggested Improvements:*

1) Drop requirements to specify exchange and its type .
2) We should not be attempting to create queue in Input Operator. For
consumer only queue name is sufficient to start consuming data from queue.
3) Currently queue name is optional, we should make it mandatory instead of
creating queue.

I tried out following scenarios to test existing operator .

*Scenarios with default exchange:*
*1) Queue is already created as non-durable with default exchange*
*Setup:*
rabbitMQInputOperator.setQueueName("test_2");
rabbitMQInputOperator.setExchangeType("fanout");
rabbitMQInputOperator.setExchange("");
rabbitMQInputOperator.setHost("localhost");
rabbitMQInputOperator.setPort(5672);

*Exception:*
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
protocol method: #method<channel.close>(reply-code=403,
reply-text=ACCESS_REFUSED - operation not permitted on the default
exchange, class-id=40, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at
com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at
com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:398)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:128

*Reason:*
Default exchange is specified in the code as “” . Though we are trying to
consume from specified and already created queue, operator crashes as
exchangeDeclare() call fails .

*2)Queue is not created before launching an app   *

*Setup:*
rabbitMQInputOperator.setQueueName("test");
rabbitMQInputOperator.setExchangeType("fanout");
rabbitMQInputOperator.setExchange("");
rabbitMQInputOperator.setHost("localhost");
rabbitMQInputOperator.setPort(5672);

*Exception:*
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
protocol method: #method<channel.close>(reply-code=403,
reply-text=ACCESS_REFUSED - operation not permitted on the default
exchange, class-id=40, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at
com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at
com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:398)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:128

*Reason:*
Current operator failed to create default exchange .

*Scnenarios with Custom Exchanges:*

*1)Queue “test_2” created as non-durable with exchange “new_exchange” in
rabbitmq*
*Setup:*
rabbitMQInputOperator.setQueueName("test_2");
rabbitMQInputOperator.setExchangeType("fanout");
rabbitMQInputOperator.setExchange("new_exchange");
rabbitMQInputOperator.setHost("localhost");
rabbitMQInputOperator.setPort(5672);

*Exception:*
Exception Caused: due to mismatch in param while declaring queue as
“durable”
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
protocol method: #method<channel.close>(reply-code=406,
reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange
'new_exchange' in vhost '/': received 'false' but current is 'true',
class-id=40, method-id=10)
*Reason:*
Existing queue was transient ( non-durable ) while code tries to create
durable queue with the same name.

*2) When Queue and exchange are not created ( declared ) in rabbitmq*
*Setup:*
No queue and exchange are present in rabbitmq
App with following configuration ends up creating durable queue “test_2”
 in new exchange “new_exchange” as “fanout” type and routing key as “”.

rabbitMQInputOperator.setQueueName("test_2");
rabbitMQInputOperator.setExchangeType("fanout");
rabbitMQInputOperator.setExchange("new_exchange");
rabbitMQInputOperator.setHost("localhost");
rabbitMQInputOperator.setPort(5672);

*Result:*
Now external entities can pushed data to newly created exchange and queue
*3) Queue and exchanges are already created rabbitmq with queue as durable
and exchange and exchange Type as specified specified. *

*Setup:*

rabbitMQInputOperator.setQueueName("test");
rabbitMQInputOperator.setExchangeType("fanout");
rabbitMQInputOperator.setExchange("new_exchange");
rabbitMQInputOperator.setRoutingKey("test");
rabbitMQInputOperator.setHost("localhost");
rabbitMQInputOperator.setPort(5672);
*Result:*
Worked with no issues. But it demands that it durable queue has to be
created with proper exchange and routing key

*4) Queuename not specified in an app but exchange and exchangeType
specified for Operator *

*Setup:*
rabbitMQInputOperator.setExchangeType("fanout");
rabbitMQInputOperator.setExchange("new_exchange");
rabbitMQInputOperator.setHost("localhost");
rabbitMQInputOperator.setRoutingKey("test");
rabbitMQInputOperator.setPort(5672);

*Result:*
Operator ended up creating queue with unique names such as
“amq.gen-dHDsywLO-8eV8qZM4Q4T_w”  which gets auto deleted when last
consumer stops consuming from it.


Thanks & Regards,
Vikram

Reply via email to