[
https://issues.apache.org/jira/browse/FLINK-8468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503067#comment-16503067
]
ASF GitHub Bot commented on FLINK-8468:
---------------------------------------
Github user pduveau commented on a diff in the pull request:
https://github.com/apache/flink/pull/5410#discussion_r193356675
--- Diff:
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
---
@@ -124,7 +159,82 @@ public void closeAllResources() throws Exception {
verify(connection).close();
}
+ @Test
+ public void invokeFeaturedPublishBytesToQueue() throws Exception {
+ RMQSink<String> rmqSink = createRMQSinkFeatured();
+
+ rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
+ verify(serializationSchema).serialize(MESSAGE_STR);
+ verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, false,
false,
+ publishOptions.computeProperties(""), MESSAGE);
+ }
+
+ @Test
+ public void invokeFeaturedReturnHandlerPublishBytesToQueue() throws
Exception {
+ RMQSink<String> rmqSink = createRMQSinkFeaturedReturnHandler();
+
+ rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
+ verify(serializationSchema).serialize(MESSAGE_STR);
+ verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, true, true,
+ publishOptions.computeProperties(""), MESSAGE);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void exceptionDuringFeaturedPublishingIsNotIgnored() throws
Exception {
+ RMQSink<String> rmqSink = createRMQSinkFeatured();
+
+ doThrow(IOException.class).when(channel).basicPublish(EXCHANGE,
ROUTING_KEY, false, false,
+ publishOptions.computeProperties(""), MESSAGE);
+ rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
+ }
+
+ @Test
+ public void
exceptionDuringFeaturedPublishingIsIgnoredIfLogFailuresOnly() throws Exception {
+ RMQSink<String> rmqSink = createRMQSinkFeatured();
+ rmqSink.setLogFailuresOnly(true);
+
+ doThrow(IOException.class).when(channel).basicPublish(EXCHANGE,
ROUTING_KEY, false, false,
+ publishOptions.computeProperties(""), MESSAGE);
+ rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
+ }
+
+ private class DummyPublishOptions implements
RMQSinkPublishOptions<String> {
--- End diff --
Done
> Make the connector to take advantage of AMQP features (routing key, exchange
> and message properties)
> ----------------------------------------------------------------------------------------------------
>
> Key: FLINK-8468
> URL: https://issues.apache.org/jira/browse/FLINK-8468
> Project: Flink
> Issue Type: Improvement
> Components: RabbitMQ Connector
> Affects Versions: 1.4.0
> Reporter: Ph.Duveau
> Priority: Major
>
> Make the connector to take advantage of AMQP features by adding a constructor
> and an interface to implement
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)