[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15308479#comment-15308479 ]
ASF GitHub Bot commented on FLINK-3763: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65250839 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java --- @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.google.common.base.Preconditions; +import com.rabbitmq.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +/** + * Connection Configuration for RMQ. + * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection or + * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, String, String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection + */ +public class RMQConnectionConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class); + + private String host; + private int port; + private String virtualHost; + private String username; + private String password; + private String uri; + + private int networkRecoveryInterval; + private boolean automaticRecovery; + private boolean topologyRecovery; + + private int connectionTimeout; + private int requestedChannelMax; + private int requestedFrameMax; + private int requestedHeartbeat; + + /** + * + * @param host host name + * @param port port + * @param virtualHost virtual host + * @param username username + * @param password password + + * @param networkRecoveryInterval connection recovery interval in milliseconds + * @param automaticRecovery if automatic connection recovery + * @param topologyRecovery if topology recovery + * @param connectionTimeout connection timeout + * @param requestedChannelMax requested maximum channel number + * @param requestedFrameMax requested maximum frame size + * @param requestedHeartbeat requested heartbeat interval + * @throws NullPointerException if host or virtual host or username or password is null + */ + private RMQConnectionConfig(String host, int port, String virtualHost, String username, String password, + int networkRecoveryInterval, boolean automaticRecovery, + boolean topologyRecovery, int connectionTimeout, int requestedChannelMax, int requestedFrameMax, + int requestedHeartbeat){ + Preconditions.checkNotNull(host, "host can not be null"); + Preconditions.checkNotNull(virtualHost, "virtualHost can not be null"); + Preconditions.checkNotNull(username, "username can not be null"); + Preconditions.checkNotNull(password, "password can not be null"); + this.host = host; + this.port = port; + this.virtualHost = virtualHost; + this.username = username; + this.password = password; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; + this.requestedChannelMax = requestedChannelMax; + this.requestedFrameMax = requestedFrameMax; + this.requestedHeartbeat = requestedHeartbeat; + } + + /** + * + * @param uri the connection URI + * @param networkRecoveryInterval connection recovery interval in milliseconds + * @param automaticRecovery if automatic connection recovery + * @param topologyRecovery if topology recovery + * @param connectionTimeout connection timeout + * @param requestedChannelMax requested maximum channel number + * @param requestedFrameMax requested maximum frame size + * @param requestedHeartbeat requested heartbeat interval + * @throws NullPointerException if URI is null --- End diff -- It seems that the indentation here is done using tabs and spaces. > RabbitMQ Source/Sink standardize connection parameters > ------------------------------------------------------ > > Key: FLINK-3763 > URL: https://issues.apache.org/jira/browse/FLINK-3763 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors > Affects Versions: 1.0.1 > Reporter: Robert Batts > Assignee: Subhankar Biswas > > The RabbitMQ source and sink should have the same capabilities in terms of > establishing a connection, currently the sink is lacking connection > parameters that are available on the source. Additionally, VirtualHost should > be an offered parameter for multi-tenant RabbitMQ clusters (if not specified > it goes to the vhost '/'). > Connection Parameters > =================== > - Host - Offered on both > - Port - Source only > - Virtual Host - Neither > - User - Source only > - Password - Source only > Additionally, it might be worth offer the URI as a valid constructor because > that would offer all 5 of the above parameters in a single String. -- This message was sent by Atlassian JIRA (v6.3.4#6332)