This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 873868c87b =str Add dedicated stream timeout exceptions for stream 
timeout operators.
873868c87b is described below

commit 873868c87b39461c9d87a17918a1eb881b61a593
Author: He-Pin <[email protected]>
AuthorDate: Sat Dec 23 16:22:21 2023 +0800

    =str Add dedicated stream timeout exceptions for stream timeout operators.
---
 .../org/apache/pekko/stream/javadsl/FlowTest.java  | 15 ++++----
 .../apache/pekko/stream/javadsl/SourceTest.java    | 18 ++++------
 .../pekko/stream/StreamTimeoutException.scala      | 40 ++++++++++++++++++++++
 .../org/apache/pekko/stream/impl/Timers.scala      | 15 ++++----
 .../org/apache/pekko/stream/javadsl/BidiFlow.scala |  4 +--
 .../org/apache/pekko/stream/javadsl/Flow.scala     | 16 ++++-----
 .../org/apache/pekko/stream/javadsl/Source.scala   | 16 ++++-----
 .../org/apache/pekko/stream/javadsl/SubFlow.scala  | 16 ++++-----
 .../apache/pekko/stream/javadsl/SubSource.scala    | 16 ++++-----
 .../apache/pekko/stream/scaladsl/BidiFlow.scala    |  2 +-
 .../org/apache/pekko/stream/scaladsl/Flow.scala    |  8 ++---
 11 files changed, 100 insertions(+), 66 deletions(-)

diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index c920a8307c..f9a8e2306e 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -1364,10 +1364,9 @@ public class FlowTest extends StreamTest {
                     .runWith(Sink.head(), system)
                     .toCompletableFuture()
                     .get(3, TimeUnit.SECONDS));
-    assertEquals(
+    assertTrue(
         "A TimeoutException was expected",
-        TimeoutException.class,
-        executionException.getCause().getClass());
+        
TimeoutException.class.isAssignableFrom(executionException.getCause().getClass()));
   }
 
   @Test
@@ -1381,10 +1380,9 @@ public class FlowTest extends StreamTest {
                     .runWith(Sink.head(), system)
                     .toCompletableFuture()
                     .get(3, TimeUnit.SECONDS));
-    assertEquals(
+    assertTrue(
         "A TimeoutException was expected",
-        TimeoutException.class,
-        executionException.getCause().getClass());
+        
TimeoutException.class.isAssignableFrom(executionException.getCause().getClass()));
   }
 
   @Test
@@ -1398,10 +1396,9 @@ public class FlowTest extends StreamTest {
                     .runWith(Sink.head(), system)
                     .toCompletableFuture()
                     .get(3, TimeUnit.SECONDS));
-    assertEquals(
+    assertTrue(
         "A TimeoutException was expected",
-        TimeoutException.class,
-        executionException.getCause().getClass());
+        
TimeoutException.class.isAssignableFrom(executionException.getCause().getClass()));
   }
 
   @Test
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index f385ecb807..7849921b5d 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -55,8 +55,7 @@ import static 
org.apache.pekko.stream.testkit.StreamTestKit.PublisherProbeSubscr
 import static org.apache.pekko.stream.testkit.TestPublisher.ManualProbe;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 @SuppressWarnings("serial")
 public class SourceTest extends StreamTest {
@@ -1204,10 +1203,9 @@ public class SourceTest extends StreamTest {
                     .runWith(Sink.head(), system)
                     .toCompletableFuture()
                     .get(3, TimeUnit.SECONDS));
-    assertEquals(
+    assertTrue(
         "The cause of ExecutionException should be TimeoutException",
-        TimeoutException.class,
-        exception.getCause().getClass());
+        
TimeoutException.class.isAssignableFrom(exception.getCause().getClass()));
   }
 
   @Test
@@ -1222,10 +1220,9 @@ public class SourceTest extends StreamTest {
                     .runWith(Sink.head(), system)
                     .toCompletableFuture()
                     .get(3, TimeUnit.SECONDS));
-    assertEquals(
+    assertTrue(
         "The cause of ExecutionException should be TimeoutException",
-        TimeoutException.class,
-        exception.getCause().getClass());
+        
TimeoutException.class.isAssignableFrom(exception.getCause().getClass()));
   }
 
   @Test
@@ -1240,10 +1237,9 @@ public class SourceTest extends StreamTest {
                     .runWith(Sink.head(), system)
                     .toCompletableFuture()
                     .get(3, TimeUnit.SECONDS));
-    assertEquals(
+    assertTrue(
         "The cause of ExecutionException should be TimeoutException",
-        TimeoutException.class,
-        exception.getCause().getClass());
+        
TimeoutException.class.isAssignableFrom(exception.getCause().getClass()));
   }
 
   @Test
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/StreamTimeoutException.scala 
b/stream/src/main/scala/org/apache/pekko/stream/StreamTimeoutException.scala
new file mode 100644
index 0000000000..9be91f9967
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/StreamTimeoutException.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import org.apache.pekko
+import pekko.annotation.DoNotInherit
+
+import scala.concurrent.TimeoutException
+import scala.util.control.NoStackTrace
+
+/**
+ * Base class for timeout exceptions specific to Pekko Streams
+ *
+ * Not for user extension
+ */
+@DoNotInherit
+sealed class StreamTimeoutException(msg: String) extends TimeoutException(msg) 
with NoStackTrace
+
+final class InitialTimeoutException(msg: String) extends 
StreamTimeoutException(msg)
+
+final class CompletionTimeoutException(msg: String) extends 
StreamTimeoutException(msg)
+
+final class StreamIdleTimeoutException(msg: String) extends 
StreamTimeoutException(msg)
+
+final class BackpressureTimeoutException(msg: String) extends 
StreamTimeoutException(msg)
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Timers.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/Timers.scala
index dacb2441cc..cdbc42828f 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Timers.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Timers.scala
@@ -13,7 +13,7 @@
 
 package org.apache.pekko.stream.impl
 
-import java.util.concurrent.{ TimeUnit, TimeoutException }
+import java.util.concurrent.TimeUnit
 
 import scala.concurrent.duration.{ Duration, FiniteDuration }
 
@@ -69,12 +69,13 @@ import pekko.stream.stage._
 
         final override protected def onTimer(key: Any): Unit =
           if (!initialHasPassed)
-            failStage(new TimeoutException(s"The first element has not yet 
passed through in $timeout."))
+            failStage(
+              new InitialTimeoutException(s"The first element has not yet 
passed through in ${timeout.toCoarsest}."))
 
         override def preStart(): Unit = scheduleOnce(GraphStageLogicTimer, 
timeout)
       }
 
-    override def toString = "InitialTimeoutTimer"
+    override def toString = "InitialTimeout"
 
   }
 
@@ -90,7 +91,7 @@ import pekko.stream.stage._
         override def onPull(): Unit = pull(in)
 
         final override protected def onTimer(key: Any): Unit =
-          failStage(new TimeoutException(s"The stream has not been completed 
in $timeout."))
+          failStage(new CompletionTimeoutException(s"The stream has not been 
completed in ${timeout.toCoarsest}."))
 
         override def preStart(): Unit = scheduleOnce(GraphStageLogicTimer, 
timeout)
       }
@@ -117,7 +118,7 @@ import pekko.stream.stage._
 
         final override protected def onTimer(key: Any): Unit =
           if (nextDeadline - System.nanoTime < 0)
-            failStage(new TimeoutException(s"No elements passed in the last 
$timeout."))
+            failStage(new StreamIdleTimeoutException(s"No elements passed in 
the last ${timeout.toCoarsest}."))
 
         override def preStart(): Unit =
           scheduleWithFixedDelay(GraphStageLogicTimer, 
timeoutCheckInterval(timeout), timeoutCheckInterval(timeout))
@@ -150,7 +151,7 @@ import pekko.stream.stage._
 
         final override protected def onTimer(key: Any): Unit =
           if (waitingDemand && (nextDeadline - System.nanoTime < 0))
-            failStage(new TimeoutException(s"No demand signalled in the last 
$timeout."))
+            failStage(new BackpressureTimeoutException(s"No demand signalled 
in the last ${timeout.toCoarsest}."))
 
         override def preStart(): Unit =
           scheduleWithFixedDelay(GraphStageLogicTimer, 
timeoutCheckInterval(timeout), timeoutCheckInterval(timeout))
@@ -179,7 +180,7 @@ import pekko.stream.stage._
 
       final override def onTimer(key: Any): Unit =
         if (nextDeadline - System.nanoTime < 0)
-          failStage(new TimeoutException(s"No elements passed in the last 
$timeout."))
+          failStage(new StreamIdleTimeoutException(s"No elements passed in the 
last ${timeout.toCoarsest}."))
 
       override def preStart(): Unit =
         scheduleWithFixedDelay(GraphStageLogicTimer, 
timeoutCheckInterval(timeout), timeoutCheckInterval(timeout))
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala
index 2ea80b6b5e..809dbebdca 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala
@@ -97,7 +97,7 @@ object BidiFlow {
 
   /**
    * If the time between two processed elements *in any direction* exceed the 
provided timeout, the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.BackpressureTimeoutException]].
    *
    * There is a difference between this operator and having two idleTimeout 
Flows assembled into a BidiStage.
    * If the timeout is configured to be 1 seconds, then this operator will not 
fail even though there are elements flowing
@@ -110,7 +110,7 @@ object BidiFlow {
 
   /**
    * If the time between two processed elements *in any direction* exceed the 
provided timeout, the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.BackpressureTimeoutException]].
    *
    * There is a difference between this operator and having two idleTimeout 
Flows assembled into a BidiStage.
    * If the timeout is configured to be 1 seconds, then this operator will not 
fail even though there are elements flowing
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 0b2d60c58e..948b220544 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -3443,7 +3443,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
 
   /**
    * If the first element has not passed through this operator before the 
provided timeout, the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.InitialTimeoutException]].
    *
    * '''Emits when''' upstream emits an element
    *
@@ -3459,7 +3459,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
 
   /**
    * If the first element has not passed through this operator before the 
provided timeout, the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.InitialTimeoutException]].
    *
    * '''Emits when''' upstream emits an element
    *
@@ -3475,7 +3475,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
 
   /**
    * If the completion of the stream does not happen until the provided 
timeout, the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.CompletionTimeoutException]].
    *
    * '''Emits when''' upstream emits an element
    *
@@ -3491,7 +3491,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
 
   /**
    * If the completion of the stream does not happen until the provided 
timeout, the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.CompletionTimeoutException]].
    *
    * '''Emits when''' upstream emits an element
    *
@@ -3507,7 +3507,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
 
   /**
    * If the time between two processed elements exceeds the provided timeout, 
the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked 
periodically,
+   * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The 
timeout is checked periodically,
    * so the resolution of the check is one period (equals to timeout value).
    *
    * '''Emits when''' upstream emits an element
@@ -3524,7 +3524,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
 
   /**
    * If the time between two processed elements exceeds the provided timeout, 
the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked 
periodically,
+   * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The 
timeout is checked periodically,
    * so the resolution of the check is one period (equals to timeout value).
    *
    * '''Emits when''' upstream emits an element
@@ -3541,7 +3541,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
 
   /**
    * If the time between the emission of an element and the following 
downstream demand exceeds the provided timeout,
-   * the stream is failed with a [[java.util.concurrent.TimeoutException]]. 
The timeout is checked periodically,
+   * the stream is failed with a 
[[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is 
checked periodically,
    * so the resolution of the check is one period (equals to timeout value).
    *
    * '''Emits when''' upstream emits an element
@@ -3558,7 +3558,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
 
   /**
    * If the time between the emission of an element and the following 
downstream demand exceeds the provided timeout,
-   * the stream is failed with a [[java.util.concurrent.TimeoutException]]. 
The timeout is checked periodically,
+   * the stream is failed with a 
[[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is 
checked periodically,
    * so the resolution of the check is one period (equals to timeout value).
    *
    * '''Emits when''' upstream emits an element
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 1f6e80d60d..91cd27aa48 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -3997,7 +3997,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
 
   /**
    * If the first element has not passed through this operator before the 
provided timeout, the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.InitialTimeoutException]].
    *
    * '''Emits when''' upstream emits an element
    *
@@ -4013,7 +4013,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
 
   /**
    * If the first element has not passed through this operator before the 
provided timeout, the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.InitialTimeoutException]].
    *
    * '''Emits when''' upstream emits an element
    *
@@ -4029,7 +4029,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
 
   /**
    * If the completion of the stream does not happen until the provided 
timeout, the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.CompletionTimeoutException]].
    *
    * '''Emits when''' upstream emits an element
    *
@@ -4045,7 +4045,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
 
   /**
    * If the completion of the stream does not happen until the provided 
timeout, the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.CompletionTimeoutException]].
    *
    * '''Emits when''' upstream emits an element
    *
@@ -4061,7 +4061,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
 
   /**
    * If the time between two processed elements exceeds the provided timeout, 
the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked 
periodically,
+   * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The 
timeout is checked periodically,
    * so the resolution of the check is one period (equals to timeout value).
    *
    * '''Emits when''' upstream emits an element
@@ -4078,7 +4078,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
 
   /**
    * If the time between two processed elements exceeds the provided timeout, 
the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked 
periodically,
+   * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The 
timeout is checked periodically,
    * so the resolution of the check is one period (equals to timeout value).
    *
    * '''Emits when''' upstream emits an element
@@ -4095,7 +4095,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
 
   /**
    * If the time between the emission of an element and the following 
downstream demand exceeds the provided timeout,
-   * the stream is failed with a [[java.util.concurrent.TimeoutException]]. 
The timeout is checked periodically,
+   * the stream is failed with a 
[[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is 
checked periodically,
    * so the resolution of the check is one period (equals to timeout value).
    *
    * '''Emits when''' upstream emits an element
@@ -4112,7 +4112,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
 
   /**
    * If the time between the emission of an element and the following 
downstream demand exceeds the provided timeout,
-   * the stream is failed with a [[java.util.concurrent.TimeoutException]]. 
The timeout is checked periodically,
+   * the stream is failed with a 
[[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is 
checked periodically,
    * so the resolution of the check is one period (equals to timeout value).
    *
    * '''Emits when''' upstream emits an element
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index a4b2042ec0..25cf5107b1 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -2071,7 +2071,7 @@ class SubFlow[In, Out, Mat](
 
   /**
    * If the first element has not passed through this operator before the 
provided timeout, the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.InitialTimeoutException]].
    *
    * '''Emits when''' upstream emits an element
    *
@@ -2087,7 +2087,7 @@ class SubFlow[In, Out, Mat](
 
   /**
    * If the first element has not passed through this operator before the 
provided timeout, the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.InitialTimeoutException]].
    *
    * '''Emits when''' upstream emits an element
    *
@@ -2103,7 +2103,7 @@ class SubFlow[In, Out, Mat](
 
   /**
    * If the completion of the stream does not happen until the provided 
timeout, the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.CompletionTimeoutException]].
    *
    * '''Emits when''' upstream emits an element
    *
@@ -2119,7 +2119,7 @@ class SubFlow[In, Out, Mat](
 
   /**
    * If the completion of the stream does not happen until the provided 
timeout, the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.CompletionTimeoutException]].
    *
    * '''Emits when''' upstream emits an element
    *
@@ -2135,7 +2135,7 @@ class SubFlow[In, Out, Mat](
 
   /**
    * If the time between two processed elements exceeds the provided timeout, 
the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked 
periodically,
+   * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The 
timeout is checked periodically,
    * so the resolution of the check is one period (equals to timeout value).
    *
    * '''Emits when''' upstream emits an element
@@ -2152,7 +2152,7 @@ class SubFlow[In, Out, Mat](
 
   /**
    * If the time between two processed elements exceeds the provided timeout, 
the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked 
periodically,
+   * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The 
timeout is checked periodically,
    * so the resolution of the check is one period (equals to timeout value).
    *
    * '''Emits when''' upstream emits an element
@@ -2169,7 +2169,7 @@ class SubFlow[In, Out, Mat](
 
   /**
    * If the time between the emission of an element and the following 
downstream demand exceeds the provided timeout,
-   * the stream is failed with a [[java.util.concurrent.TimeoutException]]. 
The timeout is checked periodically,
+   * the stream is failed with a 
[[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is 
checked periodically,
    * so the resolution of the check is one period (equals to timeout value).
    *
    * '''Emits when''' upstream emits an element
@@ -2186,7 +2186,7 @@ class SubFlow[In, Out, Mat](
 
   /**
    * If the time between the emission of an element and the following 
downstream demand exceeds the provided timeout,
-   * the stream is failed with a [[java.util.concurrent.TimeoutException]]. 
The timeout is checked periodically,
+   * the stream is failed with a 
[[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is 
checked periodically,
    * so the resolution of the check is one period (equals to timeout value).
    *
    * '''Emits when''' upstream emits an element
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index e0f85c86a1..15f407f069 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -2048,7 +2048,7 @@ class SubSource[Out, Mat](
 
   /**
    * If the first element has not passed through this operator before the 
provided timeout, the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.InitialTimeoutException]].
    *
    * '''Emits when''' upstream emits an element
    *
@@ -2064,7 +2064,7 @@ class SubSource[Out, Mat](
 
   /**
    * If the first element has not passed through this operator before the 
provided timeout, the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.InitialTimeoutException]].
    *
    * '''Emits when''' upstream emits an element
    *
@@ -2080,7 +2080,7 @@ class SubSource[Out, Mat](
 
   /**
    * If the completion of the stream does not happen until the provided 
timeout, the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.CompletionTimeoutException]].
    *
    * '''Emits when''' upstream emits an element
    *
@@ -2096,7 +2096,7 @@ class SubSource[Out, Mat](
 
   /**
    * If the completion of the stream does not happen until the provided 
timeout, the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.CompletionTimeoutException]].
    *
    * '''Emits when''' upstream emits an element
    *
@@ -2112,7 +2112,7 @@ class SubSource[Out, Mat](
 
   /**
    * If the time between two processed elements exceeds the provided timeout, 
the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked 
periodically,
+   * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The 
timeout is checked periodically,
    * so the resolution of the check is one period (equals to timeout value).
    *
    * '''Emits when''' upstream emits an element
@@ -2129,7 +2129,7 @@ class SubSource[Out, Mat](
 
   /**
    * If the time between two processed elements exceeds the provided timeout, 
the stream is failed
-   * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked 
periodically,
+   * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The 
timeout is checked periodically,
    * so the resolution of the check is one period (equals to timeout value).
    *
    * '''Emits when''' upstream emits an element
@@ -2146,7 +2146,7 @@ class SubSource[Out, Mat](
 
   /**
    * If the time between the emission of an element and the following 
downstream demand exceeds the provided timeout,
-   * the stream is failed with a [[java.util.concurrent.TimeoutException]]. 
The timeout is checked periodically,
+   * the stream is failed with a 
[[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is 
checked periodically,
    * so the resolution of the check is one period (equals to timeout value).
    *
    * '''Emits when''' upstream emits an element
@@ -2163,7 +2163,7 @@ class SubSource[Out, Mat](
 
   /**
    * If the time between the emission of an element and the following 
downstream demand exceeds the provided timeout,
-   * the stream is failed with a [[java.util.concurrent.TimeoutException]]. 
The timeout is checked periodically,
+   * the stream is failed with a 
[[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is 
checked periodically,
    * so the resolution of the check is one period (equals to timeout value).
    *
    * '''Emits when''' upstream emits an element
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/BidiFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/BidiFlow.scala
index 7bdbec9dfa..e756f83353 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/BidiFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/BidiFlow.scala
@@ -312,7 +312,7 @@ object BidiFlow {
 
   /**
    * If the time between two processed elements *in any direction* exceed the 
provided timeout, the stream is failed
-   * with a [[scala.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.StreamIdleTimeoutException]].
    *
    * There is a difference between this operator and having two idleTimeout 
Flows assembled into a BidiStage.
    * If the timeout is configured to be 1 seconds, then this operator will not 
fail even though there are elements flowing
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index ec1586d831..de4c81e493 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -2461,7 +2461,7 @@ trait FlowOps[+Out, +Mat] {
 
   /**
    * If the first element has not passed through this operator before the 
provided timeout, the stream is failed
-   * with a [[scala.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.InitialTimeoutException]].
    *
    * '''Emits when''' upstream emits an element
    *
@@ -2475,7 +2475,7 @@ trait FlowOps[+Out, +Mat] {
 
   /**
    * If the completion of the stream does not happen until the provided 
timeout, the stream is failed
-   * with a [[scala.concurrent.TimeoutException]].
+   * with a [[org.apache.pekko.stream.CompletionTimeoutException]].
    *
    * '''Emits when''' upstream emits an element
    *
@@ -2489,7 +2489,7 @@ trait FlowOps[+Out, +Mat] {
 
   /**
    * If the time between two processed elements exceeds the provided timeout, 
the stream is failed
-   * with a [[scala.concurrent.TimeoutException]]. The timeout is checked 
periodically,
+   * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The 
timeout is checked periodically,
    * so the resolution of the check is one period (equals to timeout value).
    *
    * '''Emits when''' upstream emits an element
@@ -2504,7 +2504,7 @@ trait FlowOps[+Out, +Mat] {
 
   /**
    * If the time between the emission of an element and the following 
downstream demand exceeds the provided timeout,
-   * the stream is failed with a [[scala.concurrent.TimeoutException]]. The 
timeout is checked periodically,
+   * the stream is failed with a 
[[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is 
checked periodically,
    * so the resolution of the check is one period (equals to timeout value).
    *
    * '''Emits when''' upstream emits an element


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to