[ https://issues.apache.org/jira/browse/BAHIR-72?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15603840#comment-15603840 ]
ASF GitHub Bot commented on BAHIR-72: ------------------------------------- Github user shijinkui commented on a diff in the pull request: https://github.com/apache/bahir-flink/pull/7#discussion_r84816760 --- Diff: flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.netty.example + +import java.io.{BufferedReader, InputStreamReader} +import java.net._ + +import org.apache.commons.lang3.SystemUtils +import org.mortbay.util.MultiException +import org.slf4j.LoggerFactory + +import scala.collection.JavaConverters._ + +/** + * Netty Utility class for start netty service and retry tcp port + */ +object NettyUtil { + private lazy val logger = LoggerFactory.getLogger(getClass) + + /** find local inet addresses */ + def findLocalInetAddress(): InetAddress = { + + val address = InetAddress.getLocalHost + address.isLoopbackAddress match { + case true => + // Address resolves to something like 127.0.1.1, which happens on Debian; try to find + // a better address using the local network interfaces + // getNetworkInterfaces returns ifs in reverse order compared to ifconfig output order + // on unix-like system. On windows, it returns in index order. + // It's more proper to pick ip address following system output order. + val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.asScala.toSeq + val reOrderedNetworkIFs = SystemUtils.IS_OS_WINDOWS match { + case true => activeNetworkIFs + case false => activeNetworkIFs.reverse + } + + reOrderedNetworkIFs.find { ni: NetworkInterface => + val addr = ni.getInetAddresses.asScala.toSeq.filterNot { addr => + addr.isLinkLocalAddress || addr.isLoopbackAddress + } + addr.nonEmpty + } match { + case Some(ni) => + val addr = ni.getInetAddresses.asScala.toSeq.filterNot { inet => + inet.isLinkLocalAddress || inet.isLoopbackAddress + } + val address = addr.find(_.isInstanceOf[Inet4Address]).getOrElse(addr.head).getAddress + // because of Inet6Address.toHostName may add interface at the end if it knows about it + InetAddress.getByAddress(address) + case None => address + } + case false => address + } + } + + /** start service, if port is collision, retry 128 times */ + def startServiceOnPort[T]( + startPort: Int, + startService: Int => T, + maxRetries: Int = 128, + serviceName: String = ""): T = { + + if (startPort != 0 && (startPort < 1024 || startPort > 65536)) { + throw new Exception("startPort should be between 1024 and 65535 (inclusive), " + + "or 0 for a random free port.") + } + + val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + for (offset <- 0 to maxRetries) { + // Do not increment port if startPort is 0, which is treated as a special port + val tryPort = if (startPort == 0) { + startPort + } else { + // If the new port wraps around, do not try a privilege port + ((startPort + offset - 1024) % (65536 - 1024)) + 1024 + } + + try { + val result = startService(tryPort) + logger.info(s"Successfully started service$serviceString, result:$result.") + return result + } catch { + case e: Exception if isBindCollision(e) => + if (offset >= maxRetries) { + val exceptionMessage = s"${e.getMessage}: Service$serviceString failed after " + + s"$maxRetries retries! Consider explicitly setting the appropriate port for the " + + s"service$serviceString (for example spark.ui.port for SparkUI) to an available " + + "port or increasing spark.port.maxRetries." --- End diff -- @rmetzger OK, added > support netty: pushed tcp/http connector > ---------------------------------------- > > Key: BAHIR-72 > URL: https://issues.apache.org/jira/browse/BAHIR-72 > Project: Bahir > Issue Type: New Feature > Components: Flink Streaming Connectors > Reporter: shijinkui > > Also discuss from flink: https://issues.apache.org/jira/browse/FLINK-4630 > When source stream get start, listen a provided tcp port, receive stream data > from user data source. > This netty tcp source is keepping alive and end-to-end, that is from business > system to flink worker directly. > Such source service is needed in produce indeed. > describe the source in detail below: > 1. source run as a netty tcp and http server > 2. user provide a tcp port, if the port is in used, increace the port > number between 1024 to 65535. Source can parallel. > 3. callback the provided url to report the real port to listen > 4. user push streaming data to netty server, then collect the data to flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)