[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15308485#comment-15308485 ]
ASF GitHub Bot commented on FLINK-3763: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65251121 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java --- @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.google.common.base.Preconditions; +import com.rabbitmq.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +/** + * Connection Configuration for RMQ. + * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection or + * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, String, String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection + */ +public class RMQConnectionConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class); + + private String host; + private int port; + private String virtualHost; + private String username; + private String password; + private String uri; + + private int networkRecoveryInterval; + private boolean automaticRecovery; + private boolean topologyRecovery; + + private int connectionTimeout; + private int requestedChannelMax; + private int requestedFrameMax; + private int requestedHeartbeat; + + /** + * + * @param host host name + * @param port port + * @param virtualHost virtual host + * @param username username + * @param password password + + * @param networkRecoveryInterval connection recovery interval in milliseconds + * @param automaticRecovery if automatic connection recovery + * @param topologyRecovery if topology recovery + * @param connectionTimeout connection timeout + * @param requestedChannelMax requested maximum channel number + * @param requestedFrameMax requested maximum frame size + * @param requestedHeartbeat requested heartbeat interval + * @throws NullPointerException if host or virtual host or username or password is null + */ + private RMQConnectionConfig(String host, int port, String virtualHost, String username, String password, + int networkRecoveryInterval, boolean automaticRecovery, + boolean topologyRecovery, int connectionTimeout, int requestedChannelMax, int requestedFrameMax, + int requestedHeartbeat){ + Preconditions.checkNotNull(host, "host can not be null"); + Preconditions.checkNotNull(virtualHost, "virtualHost can not be null"); + Preconditions.checkNotNull(username, "username can not be null"); + Preconditions.checkNotNull(password, "password can not be null"); + this.host = host; + this.port = port; + this.virtualHost = virtualHost; + this.username = username; + this.password = password; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; + this.requestedChannelMax = requestedChannelMax; + this.requestedFrameMax = requestedFrameMax; + this.requestedHeartbeat = requestedHeartbeat; + } + + /** + * + * @param uri the connection URI + * @param networkRecoveryInterval connection recovery interval in milliseconds + * @param automaticRecovery if automatic connection recovery + * @param topologyRecovery if topology recovery + * @param connectionTimeout connection timeout + * @param requestedChannelMax requested maximum channel number + * @param requestedFrameMax requested maximum frame size + * @param requestedHeartbeat requested heartbeat interval + * @throws NullPointerException if URI is null + */ + private RMQConnectionConfig(String uri, int networkRecoveryInterval, boolean automaticRecovery, + boolean topologyRecovery, int connectionTimeout, int requestedChannelMax, int requestedFrameMax, + int requestedHeartbeat){ + Preconditions.checkNotNull(uri, "Uri can not be null"); + this.uri = uri; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; + this.requestedChannelMax = requestedChannelMax; + this.requestedFrameMax = requestedFrameMax; + this.requestedHeartbeat = requestedHeartbeat; + } + + /** @return the host to use for connections */ + public String getHost() { + return host; + } + + /** @return the port to use for connections */ + public int getPort() { + return port; + } + + /** + * Retrieve the virtual host. + * @return the virtual host to use when connecting to the broker + */ + public String getVirtualHost() { + return virtualHost; + } + + /** + * Retrieve the user name. + * @return the AMQP user name to use when connecting to the broker + */ + public String getUsername() { + return username; + } + + /** + * Retrieve the password. + * @return the password to use when connecting to the broker + */ + public String getPassword() { + return password; + } + + /** + * Retrieve the URI. + * @return the connection URI when connecting to the broker + */ + public String getUri() { + return uri; + } + + /** + * Returns automatic connection recovery interval in milliseconds. + * @return how long will automatic recovery wait before attempting to reconnect, in ms; default is 5000 + */ + public int getNetworkRecoveryInterval() { + return networkRecoveryInterval; + } + + /** + * Returns true if automatic connection recovery is enabled, false otherwise + * @return true if automatic connection recovery is enabled, false otherwise + */ + public boolean isAutomaticRecovery() { + return automaticRecovery; + } + + /** + * Returns true if topology recovery is enabled, false otherwise + * @return true if topology recovery is enabled, false otherwise + */ + public boolean isTopologyRecovery() { + return topologyRecovery; + } + + /** + * Retrieve the connection timeout. + * @return the connection timeout, in milliseconds; zero for infinite + */ + public int getConnectionTimeout() { + return connectionTimeout; + } + + /** + * Retrieve the requested maximum channel number + * @return the initially requested maximum channel number; zero for unlimited + */ + public int getRequestedChannelMax() { + return requestedChannelMax; + } + + /** + * Retrieve the requested maximum frame size + * @return the initially requested maximum frame size, in octets; zero for unlimited + */ + public int getRequestedFrameMax() { + return requestedFrameMax; + } + + /** + * Retrieve the requested heartbeat interval. + * @return the initially requested heartbeat interval, in seconds; zero for none + */ + public int getRequestedHeartbeat() { + return requestedHeartbeat; + } + + /** + * + * @return Connection Factory for RMQ + * @throws Exception if Malformed URI has been passed + */ + public ConnectionFactory getConnectionFactory() throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + if (this.uri != null && !this.uri.isEmpty()){ + try { + factory.setUri(getUri()); + }catch (Exception e){ + LOG.error("Failed to parse uri {}", e.getMessage()); + throw e; + } + } else { + factory.setHost(getHost()); + factory.setPort(getPort()); + factory.setVirtualHost(getVirtualHost()); + factory.setUsername(getUsername()); + factory.setPassword(getPassword()); + } + + factory.setAutomaticRecoveryEnabled(isAutomaticRecovery()); + factory.setConnectionTimeout(getConnectionTimeout()); + factory.setNetworkRecoveryInterval(getNetworkRecoveryInterval()); + factory.setRequestedHeartbeat(getRequestedHeartbeat()); + factory.setTopologyRecoveryEnabled(isTopologyRecovery()); + factory.setRequestedChannelMax(getRequestedChannelMax()); + factory.setRequestedFrameMax(getRequestedFrameMax()); + + return factory; + } + + public static class Builder { + /** The default host */ + public static final String DEFAULT_HOST = "localhost"; + + /** 'Use the default port' port */ + public static final int USE_DEFAULT_PORT = -1; + + /** Default virtual host */ + public static final String DEFAULT_VHOST = "/"; + + /** Default user name */ + public static final String DEFAULT_USER = "guest"; + + /** Default password */ + public static final String DEFAULT_PASS = "guest"; + + /** The default connection timeout; + * zero means wait indefinitely */ + public static final int DEFAULT_CONNECTION_TIMEOUT = 0; + + /** Default maximum channel number; + * zero for unlimited */ + public static final int DEFAULT_CHANNEL_MAX = 0; + + /** Default maximum frame size; + * zero means no limit */ + public static final int DEFAULT_FRAME_MAX = 0; + + /** Default heart-beat interval; + * zero means no heart-beats */ + public static final int DEFAULT_HEARTBEAT = 0; + + private String host = DEFAULT_HOST; + private int port = USE_DEFAULT_PORT; + private String virtualHost = DEFAULT_VHOST; + private String username = DEFAULT_USER; + private String password = DEFAULT_PASS; + + private int networkRecoveryInterval = 5000; + private boolean automaticRecovery = false; + private boolean topologyRecovery = true; + + private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT; + private int requestedChannelMax = DEFAULT_CHANNEL_MAX; + private int requestedFrameMax = DEFAULT_FRAME_MAX; + private int requestedHeartbeat = DEFAULT_HEARTBEAT; --- End diff -- This formatting is not consistent with the rest of the Flink codebase. Can you just use a space before and after the equals sign? > RabbitMQ Source/Sink standardize connection parameters > ------------------------------------------------------ > > Key: FLINK-3763 > URL: https://issues.apache.org/jira/browse/FLINK-3763 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors > Affects Versions: 1.0.1 > Reporter: Robert Batts > Assignee: Subhankar Biswas > > The RabbitMQ source and sink should have the same capabilities in terms of > establishing a connection, currently the sink is lacking connection > parameters that are available on the source. Additionally, VirtualHost should > be an offered parameter for multi-tenant RabbitMQ clusters (if not specified > it goes to the vhost '/'). > Connection Parameters > =================== > - Host - Offered on both > - Port - Source only > - Virtual Host - Neither > - User - Source only > - Password - Source only > Additionally, it might be worth offer the URI as a valid constructor because > that would offer all 5 of the above parameters in a single String. -- This message was sent by Atlassian JIRA (v6.3.4#6332)