Repository: flink Updated Branches: refs/heads/master 776253cbb -> 6cc1c179a
[FLINK-3763] Some RMQ cleanups This closes #2054 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6cc1c179 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6cc1c179 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6cc1c179 Branch: refs/heads/master Commit: 6cc1c179af5e48b97d66fd3bcbd23f6704d43f62 Parents: 86a8033 Author: Robert Metzger <[email protected]> Authored: Thu Jun 9 13:31:12 2016 +0200 Committer: Robert Metzger <[email protected]> Committed: Thu Jun 9 15:16:32 2016 +0200 ---------------------------------------------------------------------- .../streaming/connectors/rabbitmq/RMQSink.java | 72 +++++++++++++------- .../connectors/rabbitmq/RMQSource.java | 17 ++--- .../rabbitmq/common/RMQConnectionConfig.java | 14 ++-- 3 files changed, 59 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6cc1c179/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java index bf0cef7..6473164 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java @@ -30,6 +30,10 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +/** + * A Sink for publishing data into RabbitMQ + * @param <IN> + */ public class RMQSink<IN> extends RichSinkFunction<IN> { private static final long serialVersionUID = 1L; @@ -40,6 +44,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> { private transient Connection connection; private transient Channel channel; private SerializationSchema<IN> schema; + private boolean logFailuresOnly = false; /** * @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}. @@ -53,6 +58,31 @@ public class RMQSink<IN> extends RichSinkFunction<IN> { } /** + * Defines whether the producer should fail on errors, or only log them. + * If this is set to true, then exceptions will be only logged, if set to false, + * exceptions will be eventually thrown and cause the streaming program to + * fail (and enter recovery). + * + * @param logFailuresOnly The flag to indicate logging-only on exceptions. + */ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.logFailuresOnly = logFailuresOnly; + } + + + @Override + public void open(Configuration config) throws Exception { + ConnectionFactory factory = rmqConnectionConfig.getConnectionFactory(); + try { + connection = factory.newConnection(); + channel = connection.createChannel(); + channel.queueDeclare(queueName, false, false, false, null); + } catch (IOException e) { + throw new RuntimeException("Error while creating the channel", e); + } + } + + /** * Called when new data arrives to the sink, and forwards it to RMQ. * * @param value @@ -64,45 +94,37 @@ public class RMQSink<IN> extends RichSinkFunction<IN> { byte[] msg = schema.serialize(value); channel.basicPublish("", queueName, null, msg); - } catch (IOException e) { - if (LOG.isErrorEnabled()) { - LOG.error("Cannot send RMQ message {} at {}", queueName, rmqConnectionConfig.getHost()); + if (logFailuresOnly) { + LOG.error("Cannot send RMQ message {} at {}", queueName, rmqConnectionConfig.getHost(), e); + } else { + throw new RuntimeException("Cannot send RMQ message " + queueName +" at " + rmqConnectionConfig.getHost(), e); } } } - /** - * Closes the connection. - */ - private void closeChannel() { + @Override + public void close() { + IOException t = null; try { channel.close(); - connection.close(); } catch (IOException e) { - throw new RuntimeException("Error while closing RMQ connection with " + queueName - + " at " + rmqConnectionConfig.getHost(), e); + t = e; } - } - - @Override - public void open(Configuration config) throws Exception { - ConnectionFactory factory = rmqConnectionConfig.getConnectionFactory(); try { - connection = factory.newConnection(); - channel = connection.createChannel(); - channel.queueDeclare(queueName, false, false, false, null); - + connection.close(); } catch (IOException e) { - throw new RuntimeException(e); + if(t != null) { + LOG.warn("Both channel and connection closing failed. Logging channel exception and failing with connection exception", t); + } + t = e; + } + if(t != null) { + throw new RuntimeException("Error while closing RMQ connection with " + queueName + + " at " + rmqConnectionConfig.getHost(), t); } - } - - @Override - public void close() { - closeChannel(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/6cc1c179/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java index 8297f9c..20a8b0b 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java @@ -141,10 +141,9 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU channel.queueDeclare(queueName, true, false, false, null); } - /** - * Initializes the connection to RMQ. - */ - private void initializeConnection() throws Exception { + @Override + public void open(Configuration config) throws Exception { + super.open(config); ConnectionFactory factory = setupConnectionFactory(); try { connection = factory.newConnection(); @@ -154,7 +153,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU RuntimeContext runtimeContext = getRuntimeContext(); if (runtimeContext instanceof StreamingRuntimeContext - && ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()) { + && ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()) { autoAck = false; // enables transaction mode channel.txSelect(); @@ -167,14 +166,8 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU } catch (IOException e) { throw new RuntimeException("Cannot create RMQ connection with " + queueName + " at " - + rmqConnectionConfig.getHost(), e); + + rmqConnectionConfig.getHost(), e); } - } - - @Override - public void open(Configuration config) throws Exception { - super.open(config); - initializeConnection(); running = true; } http://git-wip-us.apache.org/repos/asf/flink/blob/6cc1c179/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java index 0ce7e79..72bac1c 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java @@ -73,7 +73,7 @@ public class RMQConnectionConfig implements Serializable { * @param requestedFrameMax requested maximum frame size * @param requestedHeartbeat requested heartbeat interval * @throws NullPointerException if host or virtual host or username or password is null - */ + */ private RMQConnectionConfig(String host, Integer port, String virtualHost, String username, String password, Integer networkRecoveryInterval, Boolean automaticRecovery, Boolean topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax, @@ -162,7 +162,7 @@ public class RMQConnectionConfig implements Serializable { /** * Retrieve the URI. * @return the connection URI when connecting to the broker - */ + */ public String getUri() { return uri; } @@ -227,15 +227,15 @@ public class RMQConnectionConfig implements Serializable { * * @return Connection Factory for RMQ * @throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException if Malformed URI has been passed - */ + */ public ConnectionFactory getConnectionFactory() throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException { ConnectionFactory factory = new ConnectionFactory(); if (this.uri != null && !this.uri.isEmpty()){ try { - factory.setUri(getUri()); - }catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e){ - LOG.error("Failed to parse uri {}", e.getMessage()); + factory.setUri(this.uri); + } catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e) { + LOG.error("Failed to parse uri", e); throw e; } } else { @@ -432,7 +432,7 @@ public class RMQConnectionConfig implements Serializable { * else URI will be used to initialize the client connection * {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)} * @return RMQConnectionConfig - */ + */ public RMQConnectionConfig build(){ if(this.uri != null) { return new RMQConnectionConfig(this.uri, this.networkRecoveryInterval,
