This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 466d4a500c feat: Add Pattern timeout support (#1424)
466d4a500c is described below

commit 466d4a500c76fb85b2ebfddca3c709a439e89514
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Fri Feb 28 11:37:26 2025 +0800

    feat: Add Pattern timeout support (#1424)
---
 .../org/apache/pekko/pattern/PatternsTest.java     | 47 ++++++++++++++++
 .../org/apache/pekko/pattern/PatternSpec.scala     | 23 +++++++-
 .../pekko/pattern/FutureTimeoutSupport.scala       | 65 +++++++++++++++++++++-
 .../scala/org/apache/pekko/pattern/Patterns.scala  | 12 ++++
 4 files changed, 142 insertions(+), 5 deletions(-)

diff --git 
a/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java 
b/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java
index aa75a4f3a4..1932f8ef75 100644
--- a/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java
+++ b/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java
@@ -19,6 +19,7 @@ import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
 import org.apache.pekko.testkit.PekkoSpec;
 import org.apache.pekko.testkit.TestProbe;
 import org.apache.pekko.util.Timeout;
+import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.scalatestplus.junit.JUnitSuite;
@@ -36,6 +37,7 @@ import static org.apache.pekko.pattern.Patterns.ask;
 import static org.apache.pekko.pattern.Patterns.pipe;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /** Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com> */
 public class PatternsTest extends JUnitSuite {
@@ -485,6 +487,51 @@ public class PatternsTest extends JUnitSuite {
     assertEquals(expected, actual);
   }
 
+  @Test
+  public void testCompletedStageWithTimeout() throws Exception {
+    final String expected = "Hello";
+    final CompletionStage<String> delayedStage =
+        Patterns.timeout(
+            Duration.ofMillis(200),
+            system.scheduler(),
+            ec,
+            () -> CompletableFuture.completedFuture(expected));
+    final String actual = delayedStage.toCompletableFuture().get(3, SECONDS);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testFailedCompletedStageWithTimeout() throws Exception {
+    final CompletionStage<String> delayedStage =
+        Patterns.timeout(
+            Duration.ofMillis(200),
+            system.scheduler(),
+            ec,
+            () -> {
+              CompletableFuture<String> f = new CompletableFuture<>();
+              f.completeExceptionally(new IllegalStateException("Illegal!"));
+              return f;
+            });
+    try {
+      delayedStage.toCompletableFuture().get(3, SECONDS);
+    } catch (ExecutionException e) {
+      assertTrue(e.getCause() instanceof IllegalStateException);
+      assertEquals("Illegal!", e.getCause().getMessage());
+    }
+  }
+
+  @Test
+  public void testCompletedWithTimeout() throws Exception {
+    final CompletionStage<String> delayedStage =
+        Patterns.timeout(Duration.ofMillis(200), system.scheduler(), ec, 
CompletableFuture::new);
+    try {
+      delayedStage.toCompletableFuture().get(3, SECONDS);
+    } catch (ExecutionException e) {
+      assertTrue(e.getCause() instanceof TimeoutException);
+      assertEquals("Timeout of 200 milliseconds expired", 
e.getCause().getMessage());
+    }
+  }
+
   @Test
   public void testGracefulStop() throws Exception {
     ActorRef target = system.actorOf(Props.create(StopActor.class));
diff --git 
a/actor-tests/src/test/scala/org/apache/pekko/pattern/PatternSpec.scala 
b/actor-tests/src/test/scala/org/apache/pekko/pattern/PatternSpec.scala
index 56853ba50f..e18148d849 100644
--- a/actor-tests/src/test/scala/org/apache/pekko/pattern/PatternSpec.scala
+++ b/actor-tests/src/test/scala/org/apache/pekko/pattern/PatternSpec.scala
@@ -13,10 +13,8 @@
 
 package org.apache.pekko.pattern
 
-import scala.concurrent.{ Await, Future, Promise }
-import scala.concurrent.ExecutionContextExecutor
+import scala.concurrent.{ Await, ExecutionContextExecutor, Future, Promise, 
TimeoutException }
 import scala.concurrent.duration._
-
 import org.apache.pekko
 import pekko.actor.{ Actor, Props }
 import pekko.testkit.{ PekkoSpec, TestLatch }
@@ -76,4 +74,23 @@ class PatternSpec extends PekkoSpec {
       intercept[IllegalStateException] { Await.result(r, remainingOrDefault) 
}.getMessage should ===("Mexico")
     }
   }
+
+  "pattern.timeout" must {
+    "be completed successfully eventually" in {
+      val f = pekko.pattern.timeout(100.millis, using = 
system.scheduler)(Future.successful(5))
+      Await.result(f, remainingOrDefault) should ===(5)
+    }
+
+    "be completed abnormally eventually" in {
+      val f =
+        pekko.pattern.timeout(100.millis, using = 
system.scheduler)(Future.failed(new IllegalStateException("ABC")))
+      intercept[IllegalStateException] { Await.result(f, remainingOrDefault) 
}.getMessage should ===("ABC")
+    }
+
+    "be completed with a TimeoutException if not completed within the 
specified time" in {
+      val f = pekko.pattern.timeout(100.millis, using = 
system.scheduler)(Future.never)
+      intercept[TimeoutException] { Await.result(f, remainingOrDefault) }
+    }
+  }
+
 }
diff --git 
a/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala 
b/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala
index 2be1ad59b6..c37d8b4fd1 100644
--- a/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala
+++ b/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala
@@ -13,8 +13,7 @@
 
 package org.apache.pekko.pattern
 
-import java.util.concurrent.CompletableFuture
-import java.util.concurrent.CompletionStage
+import java.util.concurrent.{ CompletableFuture, CompletionStage, 
TimeoutException }
 
 import scala.concurrent.{ ExecutionContext, Future, Promise }
 import scala.concurrent.duration.FiniteDuration
@@ -87,4 +86,66 @@ trait FutureTimeoutSupport {
       }
       p
     }
+
+  /**
+   * Returns a [[scala.concurrent.Future]] that will be completed with a 
[[TimeoutException]]
+   * if the provided value is not completed within the specified duration.
+   * @since 1.2.0
+   */
+  def timeout[T](duration: FiniteDuration, using: Scheduler)(value: => 
Future[T])(
+      implicit ec: ExecutionContext): Future[T] = {
+    val future =
+      try value
+      catch {
+        case NonFatal(t) => Future.failed(t)
+      }
+    future.value match {
+      case Some(_) => future
+      case None => // not completed yet
+        val p = Promise[T]()
+        val timeout = using.scheduleOnce(duration) {
+          p.tryFailure(new TimeoutException(s"Timeout of $duration expired"))
+          if (future.isInstanceOf[CompletableFuture[T @unchecked]]) {
+            future.asInstanceOf[CompletableFuture[T]]
+              .toCompletableFuture
+              .cancel(true)
+          }
+        }
+        future.onComplete { result =>
+          timeout.cancel()
+          p.tryComplete(result)
+        }(pekko.dispatch.ExecutionContexts.parasitic)
+        p.future
+    }
+  }
+
+  /**
+   * Returns a [[java.util.concurrent.CompletionStage]] that will be completed 
with a [[TimeoutException]]
+   * if the provided value is not completed within the specified duration.
+   * @since 1.2.0
+   */
+  def timeoutCompletionStage[T](duration: FiniteDuration, using: 
Scheduler)(value: => CompletionStage[T])(
+      implicit ec: ExecutionContext): CompletionStage[T] = {
+    val stage: CompletionStage[T] =
+      try value
+      catch {
+        case NonFatal(t) => Futures.failedCompletionStage(t)
+      }
+    if (stage.toCompletableFuture.isDone) {
+      stage
+    } else {
+      val p = new CompletableFuture[T]
+      val timeout = using.scheduleOnce(duration) {
+        p.completeExceptionally(new TimeoutException(s"Timeout of $duration 
expired"))
+        stage.toCompletableFuture.cancel(true)
+      }
+      stage.handle[Unit]((v: T, ex: Throwable) => {
+        timeout.cancel()
+        if (v != null) p.complete(v)
+        if (ex != null) p.completeExceptionally(ex)
+      })
+      p
+    }
+  }
+
 }
diff --git a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala 
b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala
index 2b8759e026..c263d852db 100644
--- a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala
+++ b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala
@@ -465,6 +465,18 @@ object Patterns {
       value: Callable[CompletionStage[T]]): CompletionStage[T] =
     afterCompletionStage(duration.asScala, scheduler)(value.call())(context)
 
+  /**
+   * Returns a [[java.util.concurrent.CompletionStage]] that will be completed 
with a [[java.util.concurrent.TimeoutException]]
+   * if the provided value is not completed within the specified duration.
+   * @since 1.2.0
+   */
+  def timeout[T](
+      duration: java.time.Duration,
+      scheduler: Scheduler,
+      context: ExecutionContext,
+      value: Callable[CompletionStage[T]]): CompletionStage[T] =
+    timeoutCompletionStage(duration.asScala, scheduler)(value.call())(context)
+
   /**
    * Returns a [[scala.concurrent.Future]] that will be completed with the 
success or failure of the provided Callable
    * after the specified duration.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to