This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 93fec3e333 fix: serialize async DNS request id generation (#2958)
93fec3e333 is described below

commit 93fec3e3333aba93decf498f1d6b72abdc4b5ce7
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun May 10 02:19:38 2026 +0800

    fix: serialize async DNS request id generation (#2958)
    
    Motivation:
    
    Async DNS resolution allocated request ids from future callbacks, which 
makes the id generator concurrency contract harder to reason about and can race 
when duplicate ids are retried.
    
    Modification:
    
    Add a request-id injector actor that owns id generation and duplicate-id 
retries while preserving existing in-flight resolution de-duplication and 
full-question DropRequest handling. Add focused tests for deterministic 
duplicate-id retries, pending request ids, and per-attempt retry timeout 
behavior.
    
    Result:
    
    Async DNS request ids are generated and retried through an actor-owned 
path, and retry behavior is covered by regression tests.
    
    Tests:
    
    - scalafmt --mode diff-ref=origin/main / passed
    
    - scalafmt --list --mode diff-ref=origin/main / passed
    
    - git diff --check / passed
    
    - sbt "actor-tests / Test / testOnly 
org.apache.pekko.io.dns.internal.AsyncDnsResolverSpec" / passed
    
    - sbt "actor-tests / Test / testOnly 
org.apache.pekko.io.dns.internal.AsyncDnsResolverSpec 
org.apache.pekko.io.dns.internal.DnsClientSpec" / passed
    
    References:
    
    Refs upstream 
https://github.com/akka/akka-core/commit/ef33e71ac9ce0b5041adcbb57d210c7d029712b6,
 which is now Apache licensed.
---
 .../io/dns/internal/AsyncDnsResolverSpec.scala     |  88 ++++++++++++++-
 .../pekko/io/dns/internal/AsyncDnsResolver.scala   | 119 ++++++++++++++++-----
 2 files changed, 178 insertions(+), 29 deletions(-)

diff --git 
a/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolverSpec.scala
 
b/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolverSpec.scala
index a3dfb031ac..13c40e872d 100644
--- 
a/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolverSpec.scala
+++ 
b/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolverSpec.scala
@@ -97,6 +97,8 @@ class AsyncDnsResolverSpec extends PekkoSpec("""
     }
 
     "handle duplicate Ids in dnsClient" in new Setup {
+      override val r = resolver(List(dnsClient1.ref, dnsClient2.ref), 
defaultConfig, deterministicIds(1, 2))
+
       r ! Resolve("cats.com", Ip(ipv4 = true, ipv6 = false))
       val firstId = dnsClient1.expectMsgPF() {
         case q4: Question4 if q4.name == "cats.com" =>
@@ -317,6 +319,11 @@ class AsyncDnsResolverSpec extends PekkoSpec("""
     }
 
     "not reuse the request ids of pending requests" in new Setup {
+      override val r = resolver(
+        List(dnsClient1.ref, dnsClient2.ref),
+        defaultConfig,
+        deterministicIds((1 to 10).map(_.toShort): _*))
+
       // Send multiple resolves for different names so no in-flight 
deduplication applies
       val resolveCount = 10
       (1 to resolveCount).foreach { i =>
@@ -324,12 +331,71 @@ class AsyncDnsResolverSpec extends PekkoSpec("""
       }
 
       // Each resolve should have received a Question4 from dnsClient1 with a 
unique ID
-      val receivedIds = (1 to resolveCount).map { _ =>
-        dnsClient1.expectMsgPF(remainingOrDefault) {
+      val receivedQuestions = (1 to resolveCount).map { _ =>
+        val id = dnsClient1.expectMsgPF(remainingOrDefault) {
           case Question4(id, _) => id
         }
+        id -> dnsClient1.lastSender
+      }
+      receivedQuestions.map(_._1).toSet.size shouldBe resolveCount
+
+      receivedQuestions.foreach {
+        case (id, replyTo) => replyTo ! Answer(id, im.Seq.empty)
+      }
+      (1 to resolveCount).foreach { _ =>
+        senderProbe.expectMsgType[Resolved]
+      }
+    }
+
+    "retry request ids that duplicate an in-flight request" in new Setup {
+      override val r = resolver(List(dnsClient1.ref), defaultConfig, 
deterministicIds(1, 1, 2))
+
+      val asker1 = TestProbe()
+      val asker2 = TestProbe()
+
+      r.tell(Resolve("host1.cats.com", Ip(ipv4 = true, ipv6 = false)), 
asker1.ref)
+      val firstQuestion = dnsClient1.expectMsgPF() {
+        case q: Question4 if q.name == "host1.cats.com" => q
+      }
+      val firstQuestionSender = dnsClient1.lastSender
+
+      r.tell(Resolve("host2.cats.com", Ip(ipv4 = true, ipv6 = false)), 
asker2.ref)
+      val duplicatedQuestion = dnsClient1.expectMsgPF() {
+        case q: Question4 if q.name == "host2.cats.com" && q.id == 
firstQuestion.id => q
+      }
+      dnsClient1.reply(DuplicateId(duplicatedQuestion.id))
+
+      val retriedQuestion = dnsClient1.expectMsgPF() {
+        case q: Question4 if q.name == "host2.cats.com" && q.id != 
duplicatedQuestion.id => q
+      }
+
+      firstQuestionSender ! Answer(firstQuestion.id, im.Seq.empty)
+      dnsClient1.reply(Answer(retriedQuestion.id, im.Seq.empty))
+
+      asker1.expectMsg(Resolved("host1.cats.com", im.Seq.empty))
+      asker2.expectMsg(Resolved("host2.cats.com", im.Seq.empty))
+    }
+
+    "allow duplicate id retries to use their own resolver timeout" in new 
Setup {
+      val config = defaultConfig.withValue("resolve-timeout", 
ConfigValueFactory.fromAnyRef("800 ms"))
+      override val r = resolver(List(dnsClient1.ref), config, 
deterministicIds(1, 2))
+
+      r ! Resolve("cats.com", Ip(ipv4 = true, ipv6 = false))
+      val firstQuestion = dnsClient1.expectMsgPF() {
+        case q: Question4 if q.name == "cats.com" => q
       }
-      receivedIds.toSet.size shouldBe resolveCount
+
+      dnsClient1.expectNoMessage(500.millis)
+      dnsClient1.reply(DuplicateId(firstQuestion.id))
+
+      val retriedQuestion = dnsClient1.expectMsgPF() {
+        case q: Question4 if q.name == "cats.com" && q.id != firstQuestion.id 
=> q
+      }
+
+      dnsClient1.expectNoMessage(450.millis)
+      dnsClient1.reply(Answer(retriedQuestion.id, im.Seq.empty))
+
+      senderProbe.expectMsg(Resolved("cats.com", im.Seq.empty))
     }
 
     "reuse in-progress resolutions" in new Setup {
@@ -355,11 +421,23 @@ class AsyncDnsResolverSpec extends PekkoSpec("""
     }
   }
 
-  def resolver(clients: List[ActorRef], config: Config): ActorRef = {
+  private def deterministicIds(ids: Short*): IdGenerator = new IdGenerator {
+    private var remainingIds = ids.toList
+
+    override def nextId(): Short = remainingIds match {
+      case nextId :: tail =>
+        remainingIds = tail
+        nextId
+      case Nil =>
+        throw new AssertionError("No deterministic DNS request ids remaining")
+    }
+  }
+
+  private def resolver(clients: List[ActorRef], config: Config, idGenerator: 
IdGenerator = IdGenerator()): ActorRef = {
     val settings = new DnsSettings(system.asInstanceOf[ExtendedActorSystem], 
config)
     system.actorOf(Props(new AsyncDnsResolver(settings, new SimpleDnsCache(),
       (_, _) => {
         clients
-      }, IdGenerator())))
+      }, idGenerator)))
   }
 }
diff --git 
a/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala 
b/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala
index 7804a74784..b3ce2f2cfe 100644
--- 
a/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala
+++ 
b/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala
@@ -16,12 +16,12 @@ package org.apache.pekko.io.dns.internal
 import java.net.{ Inet4Address, Inet6Address, InetAddress, InetSocketAddress }
 
 import scala.collection.immutable
-import scala.concurrent.{ ExecutionContextExecutor, Future }
+import scala.concurrent.{ ExecutionContextExecutor, Future, Promise }
 import scala.concurrent.ExecutionContext.parasitic
 import scala.util.{ Failure, Success, Try }
 
 import org.apache.pekko
-import pekko.actor.{ Actor, ActorLogging, ActorRef, ActorRefFactory, Props, 
Status }
+import pekko.actor.{ Actor, ActorLogging, ActorRef, ActorRefFactory, 
NoSerializationVerificationNeeded, Props, Status }
 import pekko.annotation.InternalApi
 import pekko.io.SimpleDnsCache
 import pekko.io.dns._
@@ -87,6 +87,7 @@ private[io] final class AsyncDnsResolver(
     settings.NDots)
 
   private val resolvers: List[ActorRef] = clientFactory(context, nameServers)
+  private val requestIdInjector: ActorRef = 
context.actorOf(RequestIdInjector.props(idGenerator), "requestIdInjector")
 
   // tracks in-flight resolutions by (name, requestType) -> list of senders 
waiting for the result
   private var inFlight: Map[(String, RequestType), List[ActorRef]] = Map.empty
@@ -126,7 +127,7 @@ private[io] final class AsyncDnsResolver(
             // spawn an actor to manage this resolution (apply search names, 
failover to other resolvers, etc.)
             inFlight = inFlight.updated((name, mode), List(sender()))
             context.actorOf(
-              DnsResolutionActor.props(settings, idGenerator, name, mode, 
self, resolvers))
+              DnsResolutionActor.props(settings, requestIdInjector, name, 
mode, self, resolvers))
           }
       }
 
@@ -185,22 +186,91 @@ private[pekko] object AsyncDnsResolver {
       mode: RequestType,
       result: Try[DnsProtocol.Resolved])
 
+  private sealed trait DnsQuestionPreInjection extends 
NoSerializationVerificationNeeded {
+    def requestId: Long
+    def resolver: ActorRef
+    def timeout: Timeout
+    def withId(id: Short): DnsQuestion
+  }
+
+  private final case class Question4PreInjection(requestId: Long, resolver: 
ActorRef, name: String, timeout: Timeout)
+      extends DnsQuestionPreInjection {
+    override def withId(id: Short): DnsQuestion = Question4(id, name)
+  }
+
+  private final case class Question6PreInjection(requestId: Long, resolver: 
ActorRef, name: String, timeout: Timeout)
+      extends DnsQuestionPreInjection {
+    override def withId(id: Short): DnsQuestion = Question6(id, name)
+  }
+
+  private final case class SrvQuestionPreInjection(requestId: Long, resolver: 
ActorRef, name: String, timeout: Timeout)
+      extends DnsQuestionPreInjection {
+    override def withId(id: Short): DnsQuestion = SrvQuestion(id, name)
+  }
+
+  private final case class DnsQuestionAnswer(
+      replyTo: ActorRef,
+      request: DnsQuestionPreInjection,
+      question: DnsQuestion,
+      result: Try[Any])
+      extends NoSerializationVerificationNeeded
+
+  private final case class InjectedDnsQuestionAnswer(requestId: Long, result: 
Try[Answer])
+      extends NoSerializationVerificationNeeded
+
+  private object RequestIdInjector {
+    def props(idGenerator: IdGenerator): Props = Props(new 
RequestIdInjector(idGenerator))
+  }
+
+  private class RequestIdInjector(idGenerator: IdGenerator) extends Actor {
+    private implicit val ec: ExecutionContextExecutor = context.dispatcher
+
+    override def receive: Receive = {
+      case question: DnsQuestionPreInjection =>
+        sendQuestion(sender(), question, question.withId(idGenerator.nextId()))
+
+      case DnsQuestionAnswer(replyTo, request, _, Success(result: Answer)) =>
+        replyTo ! InjectedDnsQuestionAnswer(request.requestId, Success(result))
+
+      case DnsQuestionAnswer(replyTo, request, question, 
Success(DuplicateId(_))) =>
+        sendQuestion(replyTo, request, question.withId(idGenerator.nextId()))
+
+      case DnsQuestionAnswer(replyTo, request, question, Failure(t)) =>
+        request.resolver ! DropRequest(question)
+        replyTo ! InjectedDnsQuestionAnswer(request.requestId, Failure(t))
+
+      case DnsQuestionAnswer(replyTo, request, question, Success(a)) =>
+        request.resolver ! DropRequest(question)
+        replyTo ! InjectedDnsQuestionAnswer(
+          request.requestId,
+          Failure(
+            new IllegalArgumentException("Unexpected response " + a.toString + 
" of type " + a.getClass.toString)))
+    }
+
+    private def sendQuestion(replyTo: ActorRef, request: 
DnsQuestionPreInjection, question: DnsQuestion): Unit = {
+      implicit val askTimeout: Timeout = request.timeout
+      (request.resolver ? question).onComplete { result =>
+        self ! DnsQuestionAnswer(replyTo, request, question, result)
+      }
+    }
+  }
+
   private object DnsResolutionActor {
     def props(
         settings: DnsSettings,
-        idGenerator: IdGenerator,
+        requestIdInjector: ActorRef,
         name: String,
         mode: RequestType,
         responseActor: ActorRef,
         resolvers: List[ActorRef]): Props =
-      Props(new DnsResolutionActor(settings, idGenerator, name, mode, 
responseActor, resolvers))
+      Props(new DnsResolutionActor(settings, requestIdInjector, name, mode, 
responseActor, resolvers))
   }
 
   // Per-request actor that manages DNS resolution: applies search domains, 
fails over to other resolvers.
   // Reports the final result back to `responseActor` (the AsyncDnsResolver) 
via `ResolutionAnswer`.
   private class DnsResolutionActor(
       settings: DnsSettings,
-      idGenerator: IdGenerator,
+      requestIdInjector: ActorRef,
       name: String,
       mode: RequestType,
       responseActor: ActorRef,
@@ -210,6 +280,8 @@ private[pekko] object AsyncDnsResolver {
 
     private implicit val timeout: Timeout = Timeout(settings.ResolveTimeout)
     private implicit val ec: ExecutionContextExecutor = context.dispatcher
+    private var nextRequestId = 0L
+    private var pendingQuestions = Map.empty[Long, Promise[Answer]]
 
     private def failToResolve(): Unit = {
       responseActor ! ResolutionAnswer(name, mode, 
Failure(AsyncDnsResolver.failToResolve(name, settings.NameServers)))
@@ -245,11 +317,17 @@ private[pekko] object AsyncDnsResolver {
     // safe, already verified that resolvers is non-empty
     startResolution(namesToResolve, resolvers.head, resolvers.tail)
 
+    private def questionAnswer: Receive = {
+      case InjectedDnsQuestionAnswer(requestId, result) =>
+        pendingQuestions.get(requestId).foreach(_.complete(result))
+        pendingQuestions -= requestId
+    }
+
     private def activelyResolving(
         searchName: String,
         resolver: ActorRef,
         nextNamesToTry: List[String],
-        nextResolversToTry: List[ActorRef]): Receive = {
+        nextResolversToTry: List[ActorRef]): Receive = questionAnswer.orElse {
       case resolved: DnsProtocol.Resolved =>
         if (resolved.records.isEmpty) {
           if (nextNamesToTry.nonEmpty) startResolution(nextNamesToTry, 
resolver, nextResolversToTry)
@@ -296,31 +374,24 @@ private[pekko] object AsyncDnsResolver {
       context.stop(self)
     }
 
-    private def sendQuestion(resolver: ActorRef, message: DnsQuestion): 
Future[Answer] = {
-      (resolver ? message).transformWith {
-        case Success(result: Answer) =>
-          Future.successful(result)
-        case Success(DuplicateId(_)) =>
-          sendQuestion(resolver, message.withId(idGenerator.nextId()))
-        case Failure(t) =>
-          resolver ! DropRequest(message)
-          Future.failed(t)
-        case Success(a) =>
-          resolver ! DropRequest(message)
-          Future.failed(
-            new IllegalArgumentException("Unexpected response " + a.toString + 
" of type " + a.getClass.toString))
-      }
+    private def sendQuestion(createQuestion: Long => DnsQuestionPreInjection): 
Future[Answer] = {
+      nextRequestId += 1
+      val requestId = nextRequestId
+      val promise = Promise[Answer]()
+      pendingQuestions += requestId -> promise
+      requestIdInjector.tell(createQuestion(requestId), self)
+      promise.future
     }
 
     private def resolvedFut(searchName: String, resolver: ActorRef): 
Future[DnsProtocol.Resolved] =
       mode match {
         case Ip(ipv4, ipv6) =>
           val ipv4Recs =
-            if (ipv4) sendQuestion(resolver, Question4(idGenerator.nextId(), 
searchName))
+            if (ipv4) sendQuestion(Question4PreInjection(_, resolver, 
searchName, timeout))
             else Empty
 
           val ipv6Recs =
-            if (ipv6) sendQuestion(resolver, Question6(idGenerator.nextId(), 
searchName))
+            if (ipv6) sendQuestion(Question6PreInjection(_, resolver, 
searchName, timeout))
             else Empty
 
           ipv4Recs.flatMap { v4 =>
@@ -330,7 +401,7 @@ private[pekko] object AsyncDnsResolver {
           }(parasitic)
 
         case Srv =>
-          sendQuestion(resolver, SrvQuestion(idGenerator.nextId(), 
searchName)).map { answer =>
+          sendQuestion(SrvQuestionPreInjection(_, resolver, searchName, 
timeout)).map { answer =>
             DnsProtocol.Resolved(searchName, answer.rrs, answer.additionalRecs)
           }(parasitic)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to