This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 466d4a500c feat: Add Pattern timeout support (#1424)
466d4a500c is described below
commit 466d4a500c76fb85b2ebfddca3c709a439e89514
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Fri Feb 28 11:37:26 2025 +0800
feat: Add Pattern timeout support (#1424)
---
.../org/apache/pekko/pattern/PatternsTest.java | 47 ++++++++++++++++
.../org/apache/pekko/pattern/PatternSpec.scala | 23 +++++++-
.../pekko/pattern/FutureTimeoutSupport.scala | 65 +++++++++++++++++++++-
.../scala/org/apache/pekko/pattern/Patterns.scala | 12 ++++
4 files changed, 142 insertions(+), 5 deletions(-)
diff --git
a/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java
b/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java
index aa75a4f3a4..1932f8ef75 100644
--- a/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java
+++ b/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java
@@ -19,6 +19,7 @@ import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
import org.apache.pekko.testkit.PekkoSpec;
import org.apache.pekko.testkit.TestProbe;
import org.apache.pekko.util.Timeout;
+import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
@@ -36,6 +37,7 @@ import static org.apache.pekko.pattern.Patterns.ask;
import static org.apache.pekko.pattern.Patterns.pipe;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/** Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com> */
public class PatternsTest extends JUnitSuite {
@@ -485,6 +487,51 @@ public class PatternsTest extends JUnitSuite {
assertEquals(expected, actual);
}
+ @Test
+ public void testCompletedStageWithTimeout() throws Exception {
+ final String expected = "Hello";
+ final CompletionStage<String> delayedStage =
+ Patterns.timeout(
+ Duration.ofMillis(200),
+ system.scheduler(),
+ ec,
+ () -> CompletableFuture.completedFuture(expected));
+ final String actual = delayedStage.toCompletableFuture().get(3, SECONDS);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testFailedCompletedStageWithTimeout() throws Exception {
+ final CompletionStage<String> delayedStage =
+ Patterns.timeout(
+ Duration.ofMillis(200),
+ system.scheduler(),
+ ec,
+ () -> {
+ CompletableFuture<String> f = new CompletableFuture<>();
+ f.completeExceptionally(new IllegalStateException("Illegal!"));
+ return f;
+ });
+ try {
+ delayedStage.toCompletableFuture().get(3, SECONDS);
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof IllegalStateException);
+ assertEquals("Illegal!", e.getCause().getMessage());
+ }
+ }
+
+ @Test
+ public void testCompletedWithTimeout() throws Exception {
+ final CompletionStage<String> delayedStage =
+ Patterns.timeout(Duration.ofMillis(200), system.scheduler(), ec,
CompletableFuture::new);
+ try {
+ delayedStage.toCompletableFuture().get(3, SECONDS);
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof TimeoutException);
+ assertEquals("Timeout of 200 milliseconds expired",
e.getCause().getMessage());
+ }
+ }
+
@Test
public void testGracefulStop() throws Exception {
ActorRef target = system.actorOf(Props.create(StopActor.class));
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/pattern/PatternSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/pattern/PatternSpec.scala
index 56853ba50f..e18148d849 100644
--- a/actor-tests/src/test/scala/org/apache/pekko/pattern/PatternSpec.scala
+++ b/actor-tests/src/test/scala/org/apache/pekko/pattern/PatternSpec.scala
@@ -13,10 +13,8 @@
package org.apache.pekko.pattern
-import scala.concurrent.{ Await, Future, Promise }
-import scala.concurrent.ExecutionContextExecutor
+import scala.concurrent.{ Await, ExecutionContextExecutor, Future, Promise,
TimeoutException }
import scala.concurrent.duration._
-
import org.apache.pekko
import pekko.actor.{ Actor, Props }
import pekko.testkit.{ PekkoSpec, TestLatch }
@@ -76,4 +74,23 @@ class PatternSpec extends PekkoSpec {
intercept[IllegalStateException] { Await.result(r, remainingOrDefault)
}.getMessage should ===("Mexico")
}
}
+
+ "pattern.timeout" must {
+ "be completed successfully eventually" in {
+ val f = pekko.pattern.timeout(100.millis, using =
system.scheduler)(Future.successful(5))
+ Await.result(f, remainingOrDefault) should ===(5)
+ }
+
+ "be completed abnormally eventually" in {
+ val f =
+ pekko.pattern.timeout(100.millis, using =
system.scheduler)(Future.failed(new IllegalStateException("ABC")))
+ intercept[IllegalStateException] { Await.result(f, remainingOrDefault)
}.getMessage should ===("ABC")
+ }
+
+ "be completed with a TimeoutException if not completed within the
specified time" in {
+ val f = pekko.pattern.timeout(100.millis, using =
system.scheduler)(Future.never)
+ intercept[TimeoutException] { Await.result(f, remainingOrDefault) }
+ }
+ }
+
}
diff --git
a/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala
b/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala
index 2be1ad59b6..c37d8b4fd1 100644
--- a/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala
+++ b/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala
@@ -13,8 +13,7 @@
package org.apache.pekko.pattern
-import java.util.concurrent.CompletableFuture
-import java.util.concurrent.CompletionStage
+import java.util.concurrent.{ CompletableFuture, CompletionStage,
TimeoutException }
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration.FiniteDuration
@@ -87,4 +86,66 @@ trait FutureTimeoutSupport {
}
p
}
+
+ /**
+ * Returns a [[scala.concurrent.Future]] that will be completed with a
[[TimeoutException]]
+ * if the provided value is not completed within the specified duration.
+ * @since 1.2.0
+ */
+ def timeout[T](duration: FiniteDuration, using: Scheduler)(value: =>
Future[T])(
+ implicit ec: ExecutionContext): Future[T] = {
+ val future =
+ try value
+ catch {
+ case NonFatal(t) => Future.failed(t)
+ }
+ future.value match {
+ case Some(_) => future
+ case None => // not completed yet
+ val p = Promise[T]()
+ val timeout = using.scheduleOnce(duration) {
+ p.tryFailure(new TimeoutException(s"Timeout of $duration expired"))
+ if (future.isInstanceOf[CompletableFuture[T @unchecked]]) {
+ future.asInstanceOf[CompletableFuture[T]]
+ .toCompletableFuture
+ .cancel(true)
+ }
+ }
+ future.onComplete { result =>
+ timeout.cancel()
+ p.tryComplete(result)
+ }(pekko.dispatch.ExecutionContexts.parasitic)
+ p.future
+ }
+ }
+
+ /**
+ * Returns a [[java.util.concurrent.CompletionStage]] that will be completed
with a [[TimeoutException]]
+ * if the provided value is not completed within the specified duration.
+ * @since 1.2.0
+ */
+ def timeoutCompletionStage[T](duration: FiniteDuration, using:
Scheduler)(value: => CompletionStage[T])(
+ implicit ec: ExecutionContext): CompletionStage[T] = {
+ val stage: CompletionStage[T] =
+ try value
+ catch {
+ case NonFatal(t) => Futures.failedCompletionStage(t)
+ }
+ if (stage.toCompletableFuture.isDone) {
+ stage
+ } else {
+ val p = new CompletableFuture[T]
+ val timeout = using.scheduleOnce(duration) {
+ p.completeExceptionally(new TimeoutException(s"Timeout of $duration
expired"))
+ stage.toCompletableFuture.cancel(true)
+ }
+ stage.handle[Unit]((v: T, ex: Throwable) => {
+ timeout.cancel()
+ if (v != null) p.complete(v)
+ if (ex != null) p.completeExceptionally(ex)
+ })
+ p
+ }
+ }
+
}
diff --git a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala
b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala
index 2b8759e026..c263d852db 100644
--- a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala
+++ b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala
@@ -465,6 +465,18 @@ object Patterns {
value: Callable[CompletionStage[T]]): CompletionStage[T] =
afterCompletionStage(duration.asScala, scheduler)(value.call())(context)
+ /**
+ * Returns a [[java.util.concurrent.CompletionStage]] that will be completed
with a [[java.util.concurrent.TimeoutException]]
+ * if the provided value is not completed within the specified duration.
+ * @since 1.2.0
+ */
+ def timeout[T](
+ duration: java.time.Duration,
+ scheduler: Scheduler,
+ context: ExecutionContext,
+ value: Callable[CompletionStage[T]]): CompletionStage[T] =
+ timeoutCompletionStage(duration.asScala, scheduler)(value.call())(context)
+
/**
* Returns a [[scala.concurrent.Future]] that will be completed with the
success or failure of the provided Callable
* after the specified duration.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]