This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 262d485d56 =sbt Do retry connection in Player.
262d485d56 is described below

commit 262d485d568f1d9b0d802a7826b435f86971cc8d
Author: He-Pin <[email protected]>
AuthorDate: Tue Aug 8 16:44:24 2023 +0800

    =sbt Do retry connection in Player.
---
 .../apache/pekko/remote/testconductor/Player.scala | 21 ++++++++++++--
 .../remote/testconductor/RemoteConnection.scala    |  3 ++
 project/SbtMultiJvm.scala                          | 32 ----------------------
 3 files changed, 22 insertions(+), 34 deletions(-)

diff --git 
a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala
 
b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala
index 9697219ed0..3977a8b7dc 100644
--- 
a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala
+++ 
b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala
@@ -22,11 +22,13 @@ import scala.collection.immutable
 import scala.concurrent.{ Await, ExecutionContext, Future }
 import scala.concurrent.duration._
 import scala.reflect.classTag
+import scala.util.{ Failure, Success, Try }
 import scala.util.control.NoStackTrace
 import scala.util.control.NonFatal
 
 import io.netty.channel.{ Channel, ChannelHandlerContext, 
ChannelInboundHandlerAdapter }
 import io.netty.channel.ChannelHandler.Sharable
+
 import org.apache.pekko
 import pekko.actor._
 import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
@@ -338,21 +340,36 @@ private[pekko] class PlayerHandler(
 
   import ClientFSM._
 
-  val connectionRef: AtomicReference[RemoteConnection] = new 
AtomicReference[RemoteConnection](reconnect())
+  val connectionRef: AtomicReference[RemoteConnection] = new 
AtomicReference[RemoteConnection]()
 
   var nextAttempt: Deadline = _
 
+  tryConnectToController()
+
   @nowarn("msg=deprecated")
   override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): 
Unit = {
     log.error("channel {} exception {}", ctx.channel(), cause)
     cause match {
       case _: ConnectException if reconnects > 0 =>
         reconnects -= 1
-        
scheduler.scheduleOnce(nextAttempt.timeLeft)(connectionRef.set(reconnect()))
       case e => fsm ! ConnectionFailure(e.getMessage)
     }
   }
 
+  private def tryConnectToController(): Unit = {
+    Try(reconnect()) match {
+      case Success(r) => connectionRef.set(r)
+      case Failure(ex) =>
+        log.error("Error when try to connect to remote addr:[{}] will retry, 
time left:[{}], cause:[{}].",
+          server, nextAttempt.timeLeft, ex.getMessage)
+        scheduleReconnect()
+    }
+  }
+
+  private def scheduleReconnect(): Unit = {
+    scheduler.scheduleOnce(nextAttempt.timeLeft)(tryConnectToController())
+  }
+
   private def reconnect(): RemoteConnection = {
     nextAttempt = Deadline.now + backoff
     RemoteConnection(Client, server, poolSize, this)
diff --git 
a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala
 
b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala
index 945deff836..99801878bf 100644
--- 
a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala
+++ 
b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala
@@ -31,6 +31,7 @@ import io.netty.handler.codec.{
   MessageToMessageDecoder,
   MessageToMessageEncoder
 }
+
 import org.apache.pekko
 import pekko.protobufv3.internal.Message
 import pekko.util.Helpers
@@ -130,6 +131,8 @@ private[pekko] object RemoteConnection {
           .option[java.lang.Boolean](ChannelOption.TCP_NODELAY, true)
           .option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true)
           .connect(sockaddr)
+          .sync()
+
         new RemoteConnection {
           override def channelFuture: ChannelFuture = cf
 
diff --git a/project/SbtMultiJvm.scala b/project/SbtMultiJvm.scala
index b05cf84701..e83c038746 100644
--- a/project/SbtMultiJvm.scala
+++ b/project/SbtMultiJvm.scala
@@ -22,9 +22,6 @@ import sbtassembly.AssemblyPlugin.assemblySettings
 import sbtassembly.{ AssemblyKeys, MergeStrategy }
 import AssemblyKeys._
 
-import java.net.{ InetSocketAddress, Socket }
-import java.util.concurrent.TimeUnit
-
 object MultiJvmPlugin extends AutoPlugin {
 
   case class Options(jvm: Seq[String], extra: String => Seq[String], run: 
String => Seq[String])
@@ -375,40 +372,11 @@ object MultiJvmPlugin extends AutoPlugin {
         log.debug("Starting %s for %s".format(jvmName, testClass))
         log.debug("  with JVM options: %s".format(allJvmOptions.mkString(" ")))
         val testClass2Process = (testClass, Jvm.startJvm(javaBin, 
allJvmOptions, runOptions, jvmLogger, connectInput))
-        if (index == 0) {
-          log.debug("%s for %s 's started as `Controller`, waiting before can 
be connected for clients.".format(jvmName,
-            testClass))
-          val controllerHost = hosts.head
-          val serverPort: Int = Integer.getInteger("multinode.server-port", 
4711)
-          waitingBeforeConnectable(controllerHost, serverPort, 
TimeUnit.SECONDS.toMillis(20L))
-        }
         testClass2Process
     }
     processExitCodes(name, processes, log)
   }
 
-  private def waitingBeforeConnectable(host: String, port: Int, 
timeoutInMillis: Long): Unit = {
-    val inetSocketAddress = new InetSocketAddress(host, port)
-    def telnet(addr: InetSocketAddress, timeout: Int): Boolean = {
-      val socket: Socket = new Socket()
-      try {
-        socket.connect(inetSocketAddress, timeout)
-        socket.isConnected
-      } catch {
-        case _: Exception => false
-      } finally {
-        socket.close()
-      }
-    }
-
-    val startTime = System.currentTimeMillis()
-    var connectivity = false
-    while (!connectivity && (System.currentTimeMillis() - startTime < 
timeoutInMillis)) {
-      connectivity = telnet(inetSocketAddress, 1000)
-      TimeUnit.MILLISECONDS.sleep(100)
-    }
-  }
-
   def processExitCodes(name: String, processes: Seq[(String, Process)], log: 
Logger): (String, sbt.TestResult) = {
     val exitCodes = processes.map {
       case (testClass, process) => (testClass, process.exitValue())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to