This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 262d485d56 =sbt Do retry connection in Player.
262d485d56 is described below
commit 262d485d568f1d9b0d802a7826b435f86971cc8d
Author: He-Pin <[email protected]>
AuthorDate: Tue Aug 8 16:44:24 2023 +0800
=sbt Do retry connection in Player.
---
.../apache/pekko/remote/testconductor/Player.scala | 21 ++++++++++++--
.../remote/testconductor/RemoteConnection.scala | 3 ++
project/SbtMultiJvm.scala | 32 ----------------------
3 files changed, 22 insertions(+), 34 deletions(-)
diff --git
a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala
b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala
index 9697219ed0..3977a8b7dc 100644
---
a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala
+++
b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala
@@ -22,11 +22,13 @@ import scala.collection.immutable
import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.concurrent.duration._
import scala.reflect.classTag
+import scala.util.{ Failure, Success, Try }
import scala.util.control.NoStackTrace
import scala.util.control.NonFatal
import io.netty.channel.{ Channel, ChannelHandlerContext,
ChannelInboundHandlerAdapter }
import io.netty.channel.ChannelHandler.Sharable
+
import org.apache.pekko
import pekko.actor._
import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
@@ -338,21 +340,36 @@ private[pekko] class PlayerHandler(
import ClientFSM._
- val connectionRef: AtomicReference[RemoteConnection] = new
AtomicReference[RemoteConnection](reconnect())
+ val connectionRef: AtomicReference[RemoteConnection] = new
AtomicReference[RemoteConnection]()
var nextAttempt: Deadline = _
+ tryConnectToController()
+
@nowarn("msg=deprecated")
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable):
Unit = {
log.error("channel {} exception {}", ctx.channel(), cause)
cause match {
case _: ConnectException if reconnects > 0 =>
reconnects -= 1
-
scheduler.scheduleOnce(nextAttempt.timeLeft)(connectionRef.set(reconnect()))
case e => fsm ! ConnectionFailure(e.getMessage)
}
}
+ private def tryConnectToController(): Unit = {
+ Try(reconnect()) match {
+ case Success(r) => connectionRef.set(r)
+ case Failure(ex) =>
+ log.error("Error when try to connect to remote addr:[{}] will retry,
time left:[{}], cause:[{}].",
+ server, nextAttempt.timeLeft, ex.getMessage)
+ scheduleReconnect()
+ }
+ }
+
+ private def scheduleReconnect(): Unit = {
+ scheduler.scheduleOnce(nextAttempt.timeLeft)(tryConnectToController())
+ }
+
private def reconnect(): RemoteConnection = {
nextAttempt = Deadline.now + backoff
RemoteConnection(Client, server, poolSize, this)
diff --git
a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala
b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala
index 945deff836..99801878bf 100644
---
a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala
+++
b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala
@@ -31,6 +31,7 @@ import io.netty.handler.codec.{
MessageToMessageDecoder,
MessageToMessageEncoder
}
+
import org.apache.pekko
import pekko.protobufv3.internal.Message
import pekko.util.Helpers
@@ -130,6 +131,8 @@ private[pekko] object RemoteConnection {
.option[java.lang.Boolean](ChannelOption.TCP_NODELAY, true)
.option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true)
.connect(sockaddr)
+ .sync()
+
new RemoteConnection {
override def channelFuture: ChannelFuture = cf
diff --git a/project/SbtMultiJvm.scala b/project/SbtMultiJvm.scala
index b05cf84701..e83c038746 100644
--- a/project/SbtMultiJvm.scala
+++ b/project/SbtMultiJvm.scala
@@ -22,9 +22,6 @@ import sbtassembly.AssemblyPlugin.assemblySettings
import sbtassembly.{ AssemblyKeys, MergeStrategy }
import AssemblyKeys._
-import java.net.{ InetSocketAddress, Socket }
-import java.util.concurrent.TimeUnit
-
object MultiJvmPlugin extends AutoPlugin {
case class Options(jvm: Seq[String], extra: String => Seq[String], run:
String => Seq[String])
@@ -375,40 +372,11 @@ object MultiJvmPlugin extends AutoPlugin {
log.debug("Starting %s for %s".format(jvmName, testClass))
log.debug(" with JVM options: %s".format(allJvmOptions.mkString(" ")))
val testClass2Process = (testClass, Jvm.startJvm(javaBin,
allJvmOptions, runOptions, jvmLogger, connectInput))
- if (index == 0) {
- log.debug("%s for %s 's started as `Controller`, waiting before can
be connected for clients.".format(jvmName,
- testClass))
- val controllerHost = hosts.head
- val serverPort: Int = Integer.getInteger("multinode.server-port",
4711)
- waitingBeforeConnectable(controllerHost, serverPort,
TimeUnit.SECONDS.toMillis(20L))
- }
testClass2Process
}
processExitCodes(name, processes, log)
}
- private def waitingBeforeConnectable(host: String, port: Int,
timeoutInMillis: Long): Unit = {
- val inetSocketAddress = new InetSocketAddress(host, port)
- def telnet(addr: InetSocketAddress, timeout: Int): Boolean = {
- val socket: Socket = new Socket()
- try {
- socket.connect(inetSocketAddress, timeout)
- socket.isConnected
- } catch {
- case _: Exception => false
- } finally {
- socket.close()
- }
- }
-
- val startTime = System.currentTimeMillis()
- var connectivity = false
- while (!connectivity && (System.currentTimeMillis() - startTime <
timeoutInMillis)) {
- connectivity = telnet(inetSocketAddress, 1000)
- TimeUnit.MILLISECONDS.sleep(100)
- }
- }
-
def processExitCodes(name: String, processes: Seq[(String, Process)], log:
Logger): (String, sbt.TestResult) = {
val exitCodes = processes.map {
case (testClass, process) => (testClass, process.exitValue())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]