This is an automated email from the ASF dual-hosted git repository.
engelen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new 0f10ecc24 Avoid silently swallowing the error when SSE stream
fails/retries (#1205)
0f10ecc24 is described below
commit 0f10ecc24c94c0ebc6a4a7261fe625db9eb2119f
Author: Ryan Wright <[email protected]>
AuthorDate: Wed Sep 24 11:05:13 2025 -0700
Avoid silently swallowing the error when SSE stream fails/retries (#1205)
* avoid silently swallowing error message when SSE stream fails and retries.
* format
* log from class
* warn
* Update
sse/src/main/scala/org/apache/pekko/stream/connectors/sse/scaladsl/EventSource.scala
Co-authored-by: Arnout Engelen <[email protected]>
* log at info level and explain opinionated design in comments
* Log using a String instead of Object instance type
* remove unused import
---------
Co-authored-by: He-Pin(kerr) <[email protected]>
Co-authored-by: Arnout Engelen <[email protected]>
---
.../connectors/sse/scaladsl/EventSource.scala | 21 ++++++++++++++++++++-
1 file changed, 20 insertions(+), 1 deletion(-)
diff --git
a/sse/src/main/scala/org/apache/pekko/stream/connectors/sse/scaladsl/EventSource.scala
b/sse/src/main/scala/org/apache/pekko/stream/connectors/sse/scaladsl/EventSource.scala
index 793d6e0ea..e509a6987 100644
---
a/sse/src/main/scala/org/apache/pekko/stream/connectors/sse/scaladsl/EventSource.scala
+++
b/sse/src/main/scala/org/apache/pekko/stream/connectors/sse/scaladsl/EventSource.scala
@@ -17,6 +17,7 @@ package scaladsl
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.{ ActorSystem, ClassicActorSystemProvider }
+import pekko.event.Logging
import pekko.http.scaladsl.client.RequestBuilding.Get
import pekko.http.scaladsl.coding.Coders
import pekko.http.scaladsl.model.MediaTypes.`text/event-stream`
@@ -38,6 +39,16 @@ import scala.concurrent.duration.{ Duration, FiniteDuration }
* A single source of server-sent events is obtained from the URI. Once
completed, either normally or by failure, a next
* one is obtained thereby sending a Last-Event-ID header if available. This
continues in an endless cycle.
*
+ * This endless cycle means that if the connection is interrupted,
reconnection attempts will continue forever. This can
+ * be problematic if, for example, the connection fails due to an oversized
line or event being received from the
+ * server. By default, an oversized SSE line or event will cause pekko-http to
fail the stream. If the stream fails,
+ * this connector will establish a new connection and attempt to continue
processing using Last-Event-ID. Reconsuming
+ * the oversized payload will fail the stream again, causing an infinite retry
loop. This infinite loop can look like
+ * the connector getting stuck at the same point in the stream. Since the
opinionated design of this connector is to
+ * retry forever, the connection error will be logged but only at the INFO
level. You can use optional pekko-http
+ * configuration settings to define alternate handling of oversized SSE lines
and events instead of failing the stream.
+ * See: `pekko.http.sse.oversized-line-handling` and
`oversized-event-handling`.
+ *
* The shape of this processing stage is a source of server-sent events; to
take effect it must be connected and run.
* Progress (including termination) is controlled by the connected flow or
sink, e.g. a retry delay can be implemented
* by streaming the materialized values of the handler via a throttle.
@@ -99,6 +110,7 @@ object EventSource {
import EventStreamUnmarshalling.fromEventsStream
implicit val actorSystem: ActorSystem = system.classicSystem
import actorSystem.dispatcher
+ val log = Logging(actorSystem,
"org.apache.pekko.stream.connectors.sse.EventSource")
val continuousEvents = {
def getEventSource(lastEventId: Option[String]) = {
@@ -111,7 +123,14 @@ object EventSource {
.flatMap(Unmarshal(_).to[EventSource])
.fallbackTo(Future.successful(noEvents))
}
- def recover(eventSource: EventSource) =
eventSource.recoverWithRetries(1, { case _ => noEvents })
+ def recover(eventSource: EventSource) = eventSource.recoverWithRetries(1,
+ {
+ case e =>
+ log.info(
+ "SSE Connector is retrying failed stream for: {} Error was: {}
with message: {}",
+ uri, e.getClass.getName, e.getMessage)
+ noEvents
+ })
def delimit(eventSource: EventSource) =
eventSource.concat(singleDelimiter)
Flow[Option[String]]
.mapAsync(1)(getEventSource)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]