[ 
https://issues.apache.org/jira/browse/BAHIR-72?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15603840#comment-15603840
 ] 

ASF GitHub Bot commented on BAHIR-72:
-------------------------------------

Github user shijinkui commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/7#discussion_r84816760
  
    --- Diff: 
flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala
 ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.netty.example
    +
    +import java.io.{BufferedReader, InputStreamReader}
    +import java.net._
    +
    +import org.apache.commons.lang3.SystemUtils
    +import org.mortbay.util.MultiException
    +import org.slf4j.LoggerFactory
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    + * Netty Utility class for start netty service and retry tcp port
    + */
    +object NettyUtil {
    +  private lazy val logger = LoggerFactory.getLogger(getClass)
    +
    +  /** find local inet addresses */
    +  def findLocalInetAddress(): InetAddress = {
    +
    +    val address = InetAddress.getLocalHost
    +    address.isLoopbackAddress match {
    +      case true =>
    +        // Address resolves to something like 127.0.1.1, which happens on 
Debian; try to find
    +        // a better address using the local network interfaces
    +        // getNetworkInterfaces returns ifs in reverse order compared to 
ifconfig output order
    +        // on unix-like system. On windows, it returns in index order.
    +        // It's more proper to pick ip address following system output 
order.
    +        val activeNetworkIFs = 
NetworkInterface.getNetworkInterfaces.asScala.toSeq
    +        val reOrderedNetworkIFs = SystemUtils.IS_OS_WINDOWS match {
    +          case true => activeNetworkIFs
    +          case false => activeNetworkIFs.reverse
    +        }
    +
    +        reOrderedNetworkIFs.find { ni: NetworkInterface =>
    +          val addr = ni.getInetAddresses.asScala.toSeq.filterNot { addr =>
    +            addr.isLinkLocalAddress || addr.isLoopbackAddress
    +          }
    +          addr.nonEmpty
    +        } match {
    +          case Some(ni) =>
    +            val addr = ni.getInetAddresses.asScala.toSeq.filterNot { inet 
=>
    +              inet.isLinkLocalAddress || inet.isLoopbackAddress
    +            }
    +            val address = 
addr.find(_.isInstanceOf[Inet4Address]).getOrElse(addr.head).getAddress
    +            // because of Inet6Address.toHostName may add interface at the 
end if it knows about it
    +            InetAddress.getByAddress(address)
    +          case None => address
    +        }
    +      case false => address
    +    }
    +  }
    +
    +  /** start service, if port is collision, retry 128 times */
    +  def startServiceOnPort[T](
    +    startPort: Int,
    +    startService: Int => T,
    +    maxRetries: Int = 128,
    +    serviceName: String = ""): T = {
    +
    +    if (startPort != 0 && (startPort < 1024 || startPort > 65536)) {
    +      throw new Exception("startPort should be between 1024 and 65535 
(inclusive), " +
    +        "or 0 for a random free port.")
    +    }
    +
    +    val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
    +    for (offset <- 0 to maxRetries) {
    +      // Do not increment port if startPort is 0, which is treated as a 
special port
    +      val tryPort = if (startPort == 0) {
    +        startPort
    +      } else {
    +        // If the new port wraps around, do not try a privilege port
    +        ((startPort + offset - 1024) % (65536 - 1024)) + 1024
    +      }
    +
    +      try {
    +        val result = startService(tryPort)
    +        logger.info(s"Successfully started service$serviceString, 
result:$result.")
    +        return result
    +      } catch {
    +        case e: Exception if isBindCollision(e) =>
    +          if (offset >= maxRetries) {
    +            val exceptionMessage = s"${e.getMessage}: 
Service$serviceString failed after " +
    +              s"$maxRetries retries! Consider explicitly setting the 
appropriate port for the " +
    +              s"service$serviceString (for example spark.ui.port for 
SparkUI) to an available " +
    +              "port or increasing spark.port.maxRetries."
    --- End diff --
    
    @rmetzger OK, added


> support netty: pushed tcp/http connector
> ----------------------------------------
>
>                 Key: BAHIR-72
>                 URL: https://issues.apache.org/jira/browse/BAHIR-72
>             Project: Bahir
>          Issue Type: New Feature
>          Components: Flink Streaming Connectors
>            Reporter: shijinkui
>
> Also discuss from flink: https://issues.apache.org/jira/browse/FLINK-4630
> When source stream get start, listen a provided tcp port, receive stream data 
> from user data source.
> This netty tcp source is keepping alive and end-to-end, that is from business 
> system to flink worker directly. 
> Such source service is needed in produce indeed.
> describe the source in detail below:
> 1.    source run as a netty tcp and http server
> 2.    user provide a tcp port, if the port is in used, increace the port 
> number between 1024 to 65535. Source can parallel.
> 3.    callback the provided url to report the real port to listen
> 4.    user push streaming data to netty server, then collect the data to flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to