This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 873868c87b =str Add dedicated stream timeout exceptions for stream
timeout operators.
873868c87b is described below
commit 873868c87b39461c9d87a17918a1eb881b61a593
Author: He-Pin <[email protected]>
AuthorDate: Sat Dec 23 16:22:21 2023 +0800
=str Add dedicated stream timeout exceptions for stream timeout operators.
---
.../org/apache/pekko/stream/javadsl/FlowTest.java | 15 ++++----
.../apache/pekko/stream/javadsl/SourceTest.java | 18 ++++------
.../pekko/stream/StreamTimeoutException.scala | 40 ++++++++++++++++++++++
.../org/apache/pekko/stream/impl/Timers.scala | 15 ++++----
.../org/apache/pekko/stream/javadsl/BidiFlow.scala | 4 +--
.../org/apache/pekko/stream/javadsl/Flow.scala | 16 ++++-----
.../org/apache/pekko/stream/javadsl/Source.scala | 16 ++++-----
.../org/apache/pekko/stream/javadsl/SubFlow.scala | 16 ++++-----
.../apache/pekko/stream/javadsl/SubSource.scala | 16 ++++-----
.../apache/pekko/stream/scaladsl/BidiFlow.scala | 2 +-
.../org/apache/pekko/stream/scaladsl/Flow.scala | 8 ++---
11 files changed, 100 insertions(+), 66 deletions(-)
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index c920a8307c..f9a8e2306e 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -1364,10 +1364,9 @@ public class FlowTest extends StreamTest {
.runWith(Sink.head(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS));
- assertEquals(
+ assertTrue(
"A TimeoutException was expected",
- TimeoutException.class,
- executionException.getCause().getClass());
+
TimeoutException.class.isAssignableFrom(executionException.getCause().getClass()));
}
@Test
@@ -1381,10 +1380,9 @@ public class FlowTest extends StreamTest {
.runWith(Sink.head(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS));
- assertEquals(
+ assertTrue(
"A TimeoutException was expected",
- TimeoutException.class,
- executionException.getCause().getClass());
+
TimeoutException.class.isAssignableFrom(executionException.getCause().getClass()));
}
@Test
@@ -1398,10 +1396,9 @@ public class FlowTest extends StreamTest {
.runWith(Sink.head(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS));
- assertEquals(
+ assertTrue(
"A TimeoutException was expected",
- TimeoutException.class,
- executionException.getCause().getClass());
+
TimeoutException.class.isAssignableFrom(executionException.getCause().getClass()));
}
@Test
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index f385ecb807..7849921b5d 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -55,8 +55,7 @@ import static
org.apache.pekko.stream.testkit.StreamTestKit.PublisherProbeSubscr
import static org.apache.pekko.stream.testkit.TestPublisher.ManualProbe;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
@SuppressWarnings("serial")
public class SourceTest extends StreamTest {
@@ -1204,10 +1203,9 @@ public class SourceTest extends StreamTest {
.runWith(Sink.head(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS));
- assertEquals(
+ assertTrue(
"The cause of ExecutionException should be TimeoutException",
- TimeoutException.class,
- exception.getCause().getClass());
+
TimeoutException.class.isAssignableFrom(exception.getCause().getClass()));
}
@Test
@@ -1222,10 +1220,9 @@ public class SourceTest extends StreamTest {
.runWith(Sink.head(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS));
- assertEquals(
+ assertTrue(
"The cause of ExecutionException should be TimeoutException",
- TimeoutException.class,
- exception.getCause().getClass());
+
TimeoutException.class.isAssignableFrom(exception.getCause().getClass()));
}
@Test
@@ -1240,10 +1237,9 @@ public class SourceTest extends StreamTest {
.runWith(Sink.head(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS));
- assertEquals(
+ assertTrue(
"The cause of ExecutionException should be TimeoutException",
- TimeoutException.class,
- exception.getCause().getClass());
+
TimeoutException.class.isAssignableFrom(exception.getCause().getClass()));
}
@Test
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/StreamTimeoutException.scala
b/stream/src/main/scala/org/apache/pekko/stream/StreamTimeoutException.scala
new file mode 100644
index 0000000000..9be91f9967
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/StreamTimeoutException.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import org.apache.pekko
+import pekko.annotation.DoNotInherit
+
+import scala.concurrent.TimeoutException
+import scala.util.control.NoStackTrace
+
+/**
+ * Base class for timeout exceptions specific to Pekko Streams
+ *
+ * Not for user extension
+ */
+@DoNotInherit
+sealed class StreamTimeoutException(msg: String) extends TimeoutException(msg)
with NoStackTrace
+
+final class InitialTimeoutException(msg: String) extends
StreamTimeoutException(msg)
+
+final class CompletionTimeoutException(msg: String) extends
StreamTimeoutException(msg)
+
+final class StreamIdleTimeoutException(msg: String) extends
StreamTimeoutException(msg)
+
+final class BackpressureTimeoutException(msg: String) extends
StreamTimeoutException(msg)
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Timers.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/Timers.scala
index dacb2441cc..cdbc42828f 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Timers.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Timers.scala
@@ -13,7 +13,7 @@
package org.apache.pekko.stream.impl
-import java.util.concurrent.{ TimeUnit, TimeoutException }
+import java.util.concurrent.TimeUnit
import scala.concurrent.duration.{ Duration, FiniteDuration }
@@ -69,12 +69,13 @@ import pekko.stream.stage._
final override protected def onTimer(key: Any): Unit =
if (!initialHasPassed)
- failStage(new TimeoutException(s"The first element has not yet
passed through in $timeout."))
+ failStage(
+ new InitialTimeoutException(s"The first element has not yet
passed through in ${timeout.toCoarsest}."))
override def preStart(): Unit = scheduleOnce(GraphStageLogicTimer,
timeout)
}
- override def toString = "InitialTimeoutTimer"
+ override def toString = "InitialTimeout"
}
@@ -90,7 +91,7 @@ import pekko.stream.stage._
override def onPull(): Unit = pull(in)
final override protected def onTimer(key: Any): Unit =
- failStage(new TimeoutException(s"The stream has not been completed
in $timeout."))
+ failStage(new CompletionTimeoutException(s"The stream has not been
completed in ${timeout.toCoarsest}."))
override def preStart(): Unit = scheduleOnce(GraphStageLogicTimer,
timeout)
}
@@ -117,7 +118,7 @@ import pekko.stream.stage._
final override protected def onTimer(key: Any): Unit =
if (nextDeadline - System.nanoTime < 0)
- failStage(new TimeoutException(s"No elements passed in the last
$timeout."))
+ failStage(new StreamIdleTimeoutException(s"No elements passed in
the last ${timeout.toCoarsest}."))
override def preStart(): Unit =
scheduleWithFixedDelay(GraphStageLogicTimer,
timeoutCheckInterval(timeout), timeoutCheckInterval(timeout))
@@ -150,7 +151,7 @@ import pekko.stream.stage._
final override protected def onTimer(key: Any): Unit =
if (waitingDemand && (nextDeadline - System.nanoTime < 0))
- failStage(new TimeoutException(s"No demand signalled in the last
$timeout."))
+ failStage(new BackpressureTimeoutException(s"No demand signalled
in the last ${timeout.toCoarsest}."))
override def preStart(): Unit =
scheduleWithFixedDelay(GraphStageLogicTimer,
timeoutCheckInterval(timeout), timeoutCheckInterval(timeout))
@@ -179,7 +180,7 @@ import pekko.stream.stage._
final override def onTimer(key: Any): Unit =
if (nextDeadline - System.nanoTime < 0)
- failStage(new TimeoutException(s"No elements passed in the last
$timeout."))
+ failStage(new StreamIdleTimeoutException(s"No elements passed in the
last ${timeout.toCoarsest}."))
override def preStart(): Unit =
scheduleWithFixedDelay(GraphStageLogicTimer,
timeoutCheckInterval(timeout), timeoutCheckInterval(timeout))
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala
index 2ea80b6b5e..809dbebdca 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala
@@ -97,7 +97,7 @@ object BidiFlow {
/**
* If the time between two processed elements *in any direction* exceed the
provided timeout, the stream is failed
- * with a [[java.util.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.BackpressureTimeoutException]].
*
* There is a difference between this operator and having two idleTimeout
Flows assembled into a BidiStage.
* If the timeout is configured to be 1 seconds, then this operator will not
fail even though there are elements flowing
@@ -110,7 +110,7 @@ object BidiFlow {
/**
* If the time between two processed elements *in any direction* exceed the
provided timeout, the stream is failed
- * with a [[java.util.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.BackpressureTimeoutException]].
*
* There is a difference between this operator and having two idleTimeout
Flows assembled into a BidiStage.
* If the timeout is configured to be 1 seconds, then this operator will not
fail even though there are elements flowing
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 0b2d60c58e..948b220544 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -3443,7 +3443,7 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
/**
* If the first element has not passed through this operator before the
provided timeout, the stream is failed
- * with a [[java.util.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.InitialTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
@@ -3459,7 +3459,7 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
/**
* If the first element has not passed through this operator before the
provided timeout, the stream is failed
- * with a [[java.util.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.InitialTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
@@ -3475,7 +3475,7 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
/**
* If the completion of the stream does not happen until the provided
timeout, the stream is failed
- * with a [[java.util.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.CompletionTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
@@ -3491,7 +3491,7 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
/**
* If the completion of the stream does not happen until the provided
timeout, the stream is failed
- * with a [[java.util.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.CompletionTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
@@ -3507,7 +3507,7 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
/**
* If the time between two processed elements exceeds the provided timeout,
the stream is failed
- * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked
periodically,
+ * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The
timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
@@ -3524,7 +3524,7 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
/**
* If the time between two processed elements exceeds the provided timeout,
the stream is failed
- * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked
periodically,
+ * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The
timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
@@ -3541,7 +3541,7 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
/**
* If the time between the emission of an element and the following
downstream demand exceeds the provided timeout,
- * the stream is failed with a [[java.util.concurrent.TimeoutException]].
The timeout is checked periodically,
+ * the stream is failed with a
[[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is
checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
@@ -3558,7 +3558,7 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
/**
* If the time between the emission of an element and the following
downstream demand exceeds the provided timeout,
- * the stream is failed with a [[java.util.concurrent.TimeoutException]].
The timeout is checked periodically,
+ * the stream is failed with a
[[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is
checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 1f6e80d60d..91cd27aa48 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -3997,7 +3997,7 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
/**
* If the first element has not passed through this operator before the
provided timeout, the stream is failed
- * with a [[java.util.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.InitialTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
@@ -4013,7 +4013,7 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
/**
* If the first element has not passed through this operator before the
provided timeout, the stream is failed
- * with a [[java.util.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.InitialTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
@@ -4029,7 +4029,7 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
/**
* If the completion of the stream does not happen until the provided
timeout, the stream is failed
- * with a [[java.util.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.CompletionTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
@@ -4045,7 +4045,7 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
/**
* If the completion of the stream does not happen until the provided
timeout, the stream is failed
- * with a [[java.util.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.CompletionTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
@@ -4061,7 +4061,7 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
/**
* If the time between two processed elements exceeds the provided timeout,
the stream is failed
- * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked
periodically,
+ * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The
timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
@@ -4078,7 +4078,7 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
/**
* If the time between two processed elements exceeds the provided timeout,
the stream is failed
- * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked
periodically,
+ * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The
timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
@@ -4095,7 +4095,7 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
/**
* If the time between the emission of an element and the following
downstream demand exceeds the provided timeout,
- * the stream is failed with a [[java.util.concurrent.TimeoutException]].
The timeout is checked periodically,
+ * the stream is failed with a
[[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is
checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
@@ -4112,7 +4112,7 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
/**
* If the time between the emission of an element and the following
downstream demand exceeds the provided timeout,
- * the stream is failed with a [[java.util.concurrent.TimeoutException]].
The timeout is checked periodically,
+ * the stream is failed with a
[[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is
checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index a4b2042ec0..25cf5107b1 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -2071,7 +2071,7 @@ class SubFlow[In, Out, Mat](
/**
* If the first element has not passed through this operator before the
provided timeout, the stream is failed
- * with a [[java.util.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.InitialTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
@@ -2087,7 +2087,7 @@ class SubFlow[In, Out, Mat](
/**
* If the first element has not passed through this operator before the
provided timeout, the stream is failed
- * with a [[java.util.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.InitialTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
@@ -2103,7 +2103,7 @@ class SubFlow[In, Out, Mat](
/**
* If the completion of the stream does not happen until the provided
timeout, the stream is failed
- * with a [[java.util.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.CompletionTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
@@ -2119,7 +2119,7 @@ class SubFlow[In, Out, Mat](
/**
* If the completion of the stream does not happen until the provided
timeout, the stream is failed
- * with a [[java.util.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.CompletionTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
@@ -2135,7 +2135,7 @@ class SubFlow[In, Out, Mat](
/**
* If the time between two processed elements exceeds the provided timeout,
the stream is failed
- * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked
periodically,
+ * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The
timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
@@ -2152,7 +2152,7 @@ class SubFlow[In, Out, Mat](
/**
* If the time between two processed elements exceeds the provided timeout,
the stream is failed
- * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked
periodically,
+ * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The
timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
@@ -2169,7 +2169,7 @@ class SubFlow[In, Out, Mat](
/**
* If the time between the emission of an element and the following
downstream demand exceeds the provided timeout,
- * the stream is failed with a [[java.util.concurrent.TimeoutException]].
The timeout is checked periodically,
+ * the stream is failed with a
[[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is
checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
@@ -2186,7 +2186,7 @@ class SubFlow[In, Out, Mat](
/**
* If the time between the emission of an element and the following
downstream demand exceeds the provided timeout,
- * the stream is failed with a [[java.util.concurrent.TimeoutException]].
The timeout is checked periodically,
+ * the stream is failed with a
[[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is
checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index e0f85c86a1..15f407f069 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -2048,7 +2048,7 @@ class SubSource[Out, Mat](
/**
* If the first element has not passed through this operator before the
provided timeout, the stream is failed
- * with a [[java.util.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.InitialTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
@@ -2064,7 +2064,7 @@ class SubSource[Out, Mat](
/**
* If the first element has not passed through this operator before the
provided timeout, the stream is failed
- * with a [[java.util.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.InitialTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
@@ -2080,7 +2080,7 @@ class SubSource[Out, Mat](
/**
* If the completion of the stream does not happen until the provided
timeout, the stream is failed
- * with a [[java.util.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.CompletionTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
@@ -2096,7 +2096,7 @@ class SubSource[Out, Mat](
/**
* If the completion of the stream does not happen until the provided
timeout, the stream is failed
- * with a [[java.util.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.CompletionTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
@@ -2112,7 +2112,7 @@ class SubSource[Out, Mat](
/**
* If the time between two processed elements exceeds the provided timeout,
the stream is failed
- * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked
periodically,
+ * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The
timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
@@ -2129,7 +2129,7 @@ class SubSource[Out, Mat](
/**
* If the time between two processed elements exceeds the provided timeout,
the stream is failed
- * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked
periodically,
+ * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The
timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
@@ -2146,7 +2146,7 @@ class SubSource[Out, Mat](
/**
* If the time between the emission of an element and the following
downstream demand exceeds the provided timeout,
- * the stream is failed with a [[java.util.concurrent.TimeoutException]].
The timeout is checked periodically,
+ * the stream is failed with a
[[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is
checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
@@ -2163,7 +2163,7 @@ class SubSource[Out, Mat](
/**
* If the time between the emission of an element and the following
downstream demand exceeds the provided timeout,
- * the stream is failed with a [[java.util.concurrent.TimeoutException]].
The timeout is checked periodically,
+ * the stream is failed with a
[[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is
checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/BidiFlow.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/BidiFlow.scala
index 7bdbec9dfa..e756f83353 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/BidiFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/BidiFlow.scala
@@ -312,7 +312,7 @@ object BidiFlow {
/**
* If the time between two processed elements *in any direction* exceed the
provided timeout, the stream is failed
- * with a [[scala.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.StreamIdleTimeoutException]].
*
* There is a difference between this operator and having two idleTimeout
Flows assembled into a BidiStage.
* If the timeout is configured to be 1 seconds, then this operator will not
fail even though there are elements flowing
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index ec1586d831..de4c81e493 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -2461,7 +2461,7 @@ trait FlowOps[+Out, +Mat] {
/**
* If the first element has not passed through this operator before the
provided timeout, the stream is failed
- * with a [[scala.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.InitialTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
@@ -2475,7 +2475,7 @@ trait FlowOps[+Out, +Mat] {
/**
* If the completion of the stream does not happen until the provided
timeout, the stream is failed
- * with a [[scala.concurrent.TimeoutException]].
+ * with a [[org.apache.pekko.stream.CompletionTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
@@ -2489,7 +2489,7 @@ trait FlowOps[+Out, +Mat] {
/**
* If the time between two processed elements exceeds the provided timeout,
the stream is failed
- * with a [[scala.concurrent.TimeoutException]]. The timeout is checked
periodically,
+ * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The
timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
@@ -2504,7 +2504,7 @@ trait FlowOps[+Out, +Mat] {
/**
* If the time between the emission of an element and the following
downstream demand exceeds the provided timeout,
- * the stream is failed with a [[scala.concurrent.TimeoutException]]. The
timeout is checked periodically,
+ * the stream is failed with a
[[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is
checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]