This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 93fec3e333 fix: serialize async DNS request id generation (#2958)
93fec3e333 is described below
commit 93fec3e3333aba93decf498f1d6b72abdc4b5ce7
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun May 10 02:19:38 2026 +0800
fix: serialize async DNS request id generation (#2958)
Motivation:
Async DNS resolution allocated request ids from future callbacks, which
makes the id generator concurrency contract harder to reason about and can race
when duplicate ids are retried.
Modification:
Add a request-id injector actor that owns id generation and duplicate-id
retries while preserving existing in-flight resolution de-duplication and
full-question DropRequest handling. Add focused tests for deterministic
duplicate-id retries, pending request ids, and per-attempt retry timeout
behavior.
Result:
Async DNS request ids are generated and retried through an actor-owned
path, and retry behavior is covered by regression tests.
Tests:
- scalafmt --mode diff-ref=origin/main / passed
- scalafmt --list --mode diff-ref=origin/main / passed
- git diff --check / passed
- sbt "actor-tests / Test / testOnly
org.apache.pekko.io.dns.internal.AsyncDnsResolverSpec" / passed
- sbt "actor-tests / Test / testOnly
org.apache.pekko.io.dns.internal.AsyncDnsResolverSpec
org.apache.pekko.io.dns.internal.DnsClientSpec" / passed
References:
Refs upstream
https://github.com/akka/akka-core/commit/ef33e71ac9ce0b5041adcbb57d210c7d029712b6,
which is now Apache licensed.
---
.../io/dns/internal/AsyncDnsResolverSpec.scala | 88 ++++++++++++++-
.../pekko/io/dns/internal/AsyncDnsResolver.scala | 119 ++++++++++++++++-----
2 files changed, 178 insertions(+), 29 deletions(-)
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolverSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolverSpec.scala
index a3dfb031ac..13c40e872d 100644
---
a/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolverSpec.scala
+++
b/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolverSpec.scala
@@ -97,6 +97,8 @@ class AsyncDnsResolverSpec extends PekkoSpec("""
}
"handle duplicate Ids in dnsClient" in new Setup {
+ override val r = resolver(List(dnsClient1.ref, dnsClient2.ref),
defaultConfig, deterministicIds(1, 2))
+
r ! Resolve("cats.com", Ip(ipv4 = true, ipv6 = false))
val firstId = dnsClient1.expectMsgPF() {
case q4: Question4 if q4.name == "cats.com" =>
@@ -317,6 +319,11 @@ class AsyncDnsResolverSpec extends PekkoSpec("""
}
"not reuse the request ids of pending requests" in new Setup {
+ override val r = resolver(
+ List(dnsClient1.ref, dnsClient2.ref),
+ defaultConfig,
+ deterministicIds((1 to 10).map(_.toShort): _*))
+
// Send multiple resolves for different names so no in-flight
deduplication applies
val resolveCount = 10
(1 to resolveCount).foreach { i =>
@@ -324,12 +331,71 @@ class AsyncDnsResolverSpec extends PekkoSpec("""
}
// Each resolve should have received a Question4 from dnsClient1 with a
unique ID
- val receivedIds = (1 to resolveCount).map { _ =>
- dnsClient1.expectMsgPF(remainingOrDefault) {
+ val receivedQuestions = (1 to resolveCount).map { _ =>
+ val id = dnsClient1.expectMsgPF(remainingOrDefault) {
case Question4(id, _) => id
}
+ id -> dnsClient1.lastSender
+ }
+ receivedQuestions.map(_._1).toSet.size shouldBe resolveCount
+
+ receivedQuestions.foreach {
+ case (id, replyTo) => replyTo ! Answer(id, im.Seq.empty)
+ }
+ (1 to resolveCount).foreach { _ =>
+ senderProbe.expectMsgType[Resolved]
+ }
+ }
+
+ "retry request ids that duplicate an in-flight request" in new Setup {
+ override val r = resolver(List(dnsClient1.ref), defaultConfig,
deterministicIds(1, 1, 2))
+
+ val asker1 = TestProbe()
+ val asker2 = TestProbe()
+
+ r.tell(Resolve("host1.cats.com", Ip(ipv4 = true, ipv6 = false)),
asker1.ref)
+ val firstQuestion = dnsClient1.expectMsgPF() {
+ case q: Question4 if q.name == "host1.cats.com" => q
+ }
+ val firstQuestionSender = dnsClient1.lastSender
+
+ r.tell(Resolve("host2.cats.com", Ip(ipv4 = true, ipv6 = false)),
asker2.ref)
+ val duplicatedQuestion = dnsClient1.expectMsgPF() {
+ case q: Question4 if q.name == "host2.cats.com" && q.id ==
firstQuestion.id => q
+ }
+ dnsClient1.reply(DuplicateId(duplicatedQuestion.id))
+
+ val retriedQuestion = dnsClient1.expectMsgPF() {
+ case q: Question4 if q.name == "host2.cats.com" && q.id !=
duplicatedQuestion.id => q
+ }
+
+ firstQuestionSender ! Answer(firstQuestion.id, im.Seq.empty)
+ dnsClient1.reply(Answer(retriedQuestion.id, im.Seq.empty))
+
+ asker1.expectMsg(Resolved("host1.cats.com", im.Seq.empty))
+ asker2.expectMsg(Resolved("host2.cats.com", im.Seq.empty))
+ }
+
+ "allow duplicate id retries to use their own resolver timeout" in new
Setup {
+ val config = defaultConfig.withValue("resolve-timeout",
ConfigValueFactory.fromAnyRef("800 ms"))
+ override val r = resolver(List(dnsClient1.ref), config,
deterministicIds(1, 2))
+
+ r ! Resolve("cats.com", Ip(ipv4 = true, ipv6 = false))
+ val firstQuestion = dnsClient1.expectMsgPF() {
+ case q: Question4 if q.name == "cats.com" => q
}
- receivedIds.toSet.size shouldBe resolveCount
+
+ dnsClient1.expectNoMessage(500.millis)
+ dnsClient1.reply(DuplicateId(firstQuestion.id))
+
+ val retriedQuestion = dnsClient1.expectMsgPF() {
+ case q: Question4 if q.name == "cats.com" && q.id != firstQuestion.id
=> q
+ }
+
+ dnsClient1.expectNoMessage(450.millis)
+ dnsClient1.reply(Answer(retriedQuestion.id, im.Seq.empty))
+
+ senderProbe.expectMsg(Resolved("cats.com", im.Seq.empty))
}
"reuse in-progress resolutions" in new Setup {
@@ -355,11 +421,23 @@ class AsyncDnsResolverSpec extends PekkoSpec("""
}
}
- def resolver(clients: List[ActorRef], config: Config): ActorRef = {
+ private def deterministicIds(ids: Short*): IdGenerator = new IdGenerator {
+ private var remainingIds = ids.toList
+
+ override def nextId(): Short = remainingIds match {
+ case nextId :: tail =>
+ remainingIds = tail
+ nextId
+ case Nil =>
+ throw new AssertionError("No deterministic DNS request ids remaining")
+ }
+ }
+
+ private def resolver(clients: List[ActorRef], config: Config, idGenerator:
IdGenerator = IdGenerator()): ActorRef = {
val settings = new DnsSettings(system.asInstanceOf[ExtendedActorSystem],
config)
system.actorOf(Props(new AsyncDnsResolver(settings, new SimpleDnsCache(),
(_, _) => {
clients
- }, IdGenerator())))
+ }, idGenerator)))
}
}
diff --git
a/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala
b/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala
index 7804a74784..b3ce2f2cfe 100644
---
a/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala
+++
b/actor/src/main/scala/org/apache/pekko/io/dns/internal/AsyncDnsResolver.scala
@@ -16,12 +16,12 @@ package org.apache.pekko.io.dns.internal
import java.net.{ Inet4Address, Inet6Address, InetAddress, InetSocketAddress }
import scala.collection.immutable
-import scala.concurrent.{ ExecutionContextExecutor, Future }
+import scala.concurrent.{ ExecutionContextExecutor, Future, Promise }
import scala.concurrent.ExecutionContext.parasitic
import scala.util.{ Failure, Success, Try }
import org.apache.pekko
-import pekko.actor.{ Actor, ActorLogging, ActorRef, ActorRefFactory, Props,
Status }
+import pekko.actor.{ Actor, ActorLogging, ActorRef, ActorRefFactory,
NoSerializationVerificationNeeded, Props, Status }
import pekko.annotation.InternalApi
import pekko.io.SimpleDnsCache
import pekko.io.dns._
@@ -87,6 +87,7 @@ private[io] final class AsyncDnsResolver(
settings.NDots)
private val resolvers: List[ActorRef] = clientFactory(context, nameServers)
+ private val requestIdInjector: ActorRef =
context.actorOf(RequestIdInjector.props(idGenerator), "requestIdInjector")
// tracks in-flight resolutions by (name, requestType) -> list of senders
waiting for the result
private var inFlight: Map[(String, RequestType), List[ActorRef]] = Map.empty
@@ -126,7 +127,7 @@ private[io] final class AsyncDnsResolver(
// spawn an actor to manage this resolution (apply search names,
failover to other resolvers, etc.)
inFlight = inFlight.updated((name, mode), List(sender()))
context.actorOf(
- DnsResolutionActor.props(settings, idGenerator, name, mode,
self, resolvers))
+ DnsResolutionActor.props(settings, requestIdInjector, name,
mode, self, resolvers))
}
}
@@ -185,22 +186,91 @@ private[pekko] object AsyncDnsResolver {
mode: RequestType,
result: Try[DnsProtocol.Resolved])
+ private sealed trait DnsQuestionPreInjection extends
NoSerializationVerificationNeeded {
+ def requestId: Long
+ def resolver: ActorRef
+ def timeout: Timeout
+ def withId(id: Short): DnsQuestion
+ }
+
+ private final case class Question4PreInjection(requestId: Long, resolver:
ActorRef, name: String, timeout: Timeout)
+ extends DnsQuestionPreInjection {
+ override def withId(id: Short): DnsQuestion = Question4(id, name)
+ }
+
+ private final case class Question6PreInjection(requestId: Long, resolver:
ActorRef, name: String, timeout: Timeout)
+ extends DnsQuestionPreInjection {
+ override def withId(id: Short): DnsQuestion = Question6(id, name)
+ }
+
+ private final case class SrvQuestionPreInjection(requestId: Long, resolver:
ActorRef, name: String, timeout: Timeout)
+ extends DnsQuestionPreInjection {
+ override def withId(id: Short): DnsQuestion = SrvQuestion(id, name)
+ }
+
+ private final case class DnsQuestionAnswer(
+ replyTo: ActorRef,
+ request: DnsQuestionPreInjection,
+ question: DnsQuestion,
+ result: Try[Any])
+ extends NoSerializationVerificationNeeded
+
+ private final case class InjectedDnsQuestionAnswer(requestId: Long, result:
Try[Answer])
+ extends NoSerializationVerificationNeeded
+
+ private object RequestIdInjector {
+ def props(idGenerator: IdGenerator): Props = Props(new
RequestIdInjector(idGenerator))
+ }
+
+ private class RequestIdInjector(idGenerator: IdGenerator) extends Actor {
+ private implicit val ec: ExecutionContextExecutor = context.dispatcher
+
+ override def receive: Receive = {
+ case question: DnsQuestionPreInjection =>
+ sendQuestion(sender(), question, question.withId(idGenerator.nextId()))
+
+ case DnsQuestionAnswer(replyTo, request, _, Success(result: Answer)) =>
+ replyTo ! InjectedDnsQuestionAnswer(request.requestId, Success(result))
+
+ case DnsQuestionAnswer(replyTo, request, question,
Success(DuplicateId(_))) =>
+ sendQuestion(replyTo, request, question.withId(idGenerator.nextId()))
+
+ case DnsQuestionAnswer(replyTo, request, question, Failure(t)) =>
+ request.resolver ! DropRequest(question)
+ replyTo ! InjectedDnsQuestionAnswer(request.requestId, Failure(t))
+
+ case DnsQuestionAnswer(replyTo, request, question, Success(a)) =>
+ request.resolver ! DropRequest(question)
+ replyTo ! InjectedDnsQuestionAnswer(
+ request.requestId,
+ Failure(
+ new IllegalArgumentException("Unexpected response " + a.toString +
" of type " + a.getClass.toString)))
+ }
+
+ private def sendQuestion(replyTo: ActorRef, request:
DnsQuestionPreInjection, question: DnsQuestion): Unit = {
+ implicit val askTimeout: Timeout = request.timeout
+ (request.resolver ? question).onComplete { result =>
+ self ! DnsQuestionAnswer(replyTo, request, question, result)
+ }
+ }
+ }
+
private object DnsResolutionActor {
def props(
settings: DnsSettings,
- idGenerator: IdGenerator,
+ requestIdInjector: ActorRef,
name: String,
mode: RequestType,
responseActor: ActorRef,
resolvers: List[ActorRef]): Props =
- Props(new DnsResolutionActor(settings, idGenerator, name, mode,
responseActor, resolvers))
+ Props(new DnsResolutionActor(settings, requestIdInjector, name, mode,
responseActor, resolvers))
}
// Per-request actor that manages DNS resolution: applies search domains,
fails over to other resolvers.
// Reports the final result back to `responseActor` (the AsyncDnsResolver)
via `ResolutionAnswer`.
private class DnsResolutionActor(
settings: DnsSettings,
- idGenerator: IdGenerator,
+ requestIdInjector: ActorRef,
name: String,
mode: RequestType,
responseActor: ActorRef,
@@ -210,6 +280,8 @@ private[pekko] object AsyncDnsResolver {
private implicit val timeout: Timeout = Timeout(settings.ResolveTimeout)
private implicit val ec: ExecutionContextExecutor = context.dispatcher
+ private var nextRequestId = 0L
+ private var pendingQuestions = Map.empty[Long, Promise[Answer]]
private def failToResolve(): Unit = {
responseActor ! ResolutionAnswer(name, mode,
Failure(AsyncDnsResolver.failToResolve(name, settings.NameServers)))
@@ -245,11 +317,17 @@ private[pekko] object AsyncDnsResolver {
// safe, already verified that resolvers is non-empty
startResolution(namesToResolve, resolvers.head, resolvers.tail)
+ private def questionAnswer: Receive = {
+ case InjectedDnsQuestionAnswer(requestId, result) =>
+ pendingQuestions.get(requestId).foreach(_.complete(result))
+ pendingQuestions -= requestId
+ }
+
private def activelyResolving(
searchName: String,
resolver: ActorRef,
nextNamesToTry: List[String],
- nextResolversToTry: List[ActorRef]): Receive = {
+ nextResolversToTry: List[ActorRef]): Receive = questionAnswer.orElse {
case resolved: DnsProtocol.Resolved =>
if (resolved.records.isEmpty) {
if (nextNamesToTry.nonEmpty) startResolution(nextNamesToTry,
resolver, nextResolversToTry)
@@ -296,31 +374,24 @@ private[pekko] object AsyncDnsResolver {
context.stop(self)
}
- private def sendQuestion(resolver: ActorRef, message: DnsQuestion):
Future[Answer] = {
- (resolver ? message).transformWith {
- case Success(result: Answer) =>
- Future.successful(result)
- case Success(DuplicateId(_)) =>
- sendQuestion(resolver, message.withId(idGenerator.nextId()))
- case Failure(t) =>
- resolver ! DropRequest(message)
- Future.failed(t)
- case Success(a) =>
- resolver ! DropRequest(message)
- Future.failed(
- new IllegalArgumentException("Unexpected response " + a.toString +
" of type " + a.getClass.toString))
- }
+ private def sendQuestion(createQuestion: Long => DnsQuestionPreInjection):
Future[Answer] = {
+ nextRequestId += 1
+ val requestId = nextRequestId
+ val promise = Promise[Answer]()
+ pendingQuestions += requestId -> promise
+ requestIdInjector.tell(createQuestion(requestId), self)
+ promise.future
}
private def resolvedFut(searchName: String, resolver: ActorRef):
Future[DnsProtocol.Resolved] =
mode match {
case Ip(ipv4, ipv6) =>
val ipv4Recs =
- if (ipv4) sendQuestion(resolver, Question4(idGenerator.nextId(),
searchName))
+ if (ipv4) sendQuestion(Question4PreInjection(_, resolver,
searchName, timeout))
else Empty
val ipv6Recs =
- if (ipv6) sendQuestion(resolver, Question6(idGenerator.nextId(),
searchName))
+ if (ipv6) sendQuestion(Question6PreInjection(_, resolver,
searchName, timeout))
else Empty
ipv4Recs.flatMap { v4 =>
@@ -330,7 +401,7 @@ private[pekko] object AsyncDnsResolver {
}(parasitic)
case Srv =>
- sendQuestion(resolver, SrvQuestion(idGenerator.nextId(),
searchName)).map { answer =>
+ sendQuestion(SrvQuestionPreInjection(_, resolver, searchName,
timeout)).map { answer =>
DnsProtocol.Resolved(searchName, answer.rrs, answer.additionalRecs)
}(parasitic)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]