[
https://issues.apache.org/jira/browse/FLINK-8468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16502001#comment-16502001
]
ASF GitHub Bot commented on FLINK-8468:
---------------------------------------
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5410#discussion_r193127128
--- Diff:
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
---
@@ -105,7 +153,18 @@ public void invoke(IN value) {
try {
byte[] msg = schema.serialize(value);
- channel.basicPublish("", queueName, null, msg);
+ if (messageCompute == null) {
+ channel.basicPublish("", queueName, null, msg);
+ } else {
+ String rk =
messageCompute.computeRoutingKey(value);
+ String exchange =
messageCompute.computeExchange(value);
+ channel.basicPublish((exchange != null) ?
exchange : "",
+ (rk != null) ? rk : "",
+ (returnListener != null) &&
messageCompute.computeMandatory(value),
--- End diff --
I would prefer that an`IllegalStateException` is thrown if the user code
returns `true` without a `returnListener`.
> Make the connector to take advantage of AMQP features (routing key, exchange
> and message properties)
> ----------------------------------------------------------------------------------------------------
>
> Key: FLINK-8468
> URL: https://issues.apache.org/jira/browse/FLINK-8468
> Project: Flink
> Issue Type: Improvement
> Components: RabbitMQ Connector
> Affects Versions: 1.4.0
> Reporter: Ph.Duveau
> Priority: Major
>
> Make the connector to take advantage of AMQP features by adding a constructor
> and an interface to implement
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)