This is an automated email from the ASF dual-hosted git repository.

engelen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f10ecc24 Avoid silently swallowing the error when SSE stream 
fails/retries (#1205)
0f10ecc24 is described below

commit 0f10ecc24c94c0ebc6a4a7261fe625db9eb2119f
Author: Ryan Wright <[email protected]>
AuthorDate: Wed Sep 24 11:05:13 2025 -0700

    Avoid silently swallowing the error when SSE stream fails/retries (#1205)
    
    * avoid silently swallowing error message when SSE stream fails and retries.
    
    * format
    
    * log from class
    
    * warn
    
    * Update 
sse/src/main/scala/org/apache/pekko/stream/connectors/sse/scaladsl/EventSource.scala
    
    Co-authored-by: Arnout Engelen <[email protected]>
    
    * log at info level and explain opinionated design in comments
    
    * Log using a String instead of Object instance type
    
    * remove unused import
    
    ---------
    
    Co-authored-by: He-Pin(kerr) <[email protected]>
    Co-authored-by: Arnout Engelen <[email protected]>
---
 .../connectors/sse/scaladsl/EventSource.scala       | 21 ++++++++++++++++++++-
 1 file changed, 20 insertions(+), 1 deletion(-)

diff --git 
a/sse/src/main/scala/org/apache/pekko/stream/connectors/sse/scaladsl/EventSource.scala
 
b/sse/src/main/scala/org/apache/pekko/stream/connectors/sse/scaladsl/EventSource.scala
index 793d6e0ea..e509a6987 100644
--- 
a/sse/src/main/scala/org/apache/pekko/stream/connectors/sse/scaladsl/EventSource.scala
+++ 
b/sse/src/main/scala/org/apache/pekko/stream/connectors/sse/scaladsl/EventSource.scala
@@ -17,6 +17,7 @@ package scaladsl
 import org.apache.pekko
 import pekko.NotUsed
 import pekko.actor.{ ActorSystem, ClassicActorSystemProvider }
+import pekko.event.Logging
 import pekko.http.scaladsl.client.RequestBuilding.Get
 import pekko.http.scaladsl.coding.Coders
 import pekko.http.scaladsl.model.MediaTypes.`text/event-stream`
@@ -38,6 +39,16 @@ import scala.concurrent.duration.{ Duration, FiniteDuration }
  * A single source of server-sent events is obtained from the URI. Once 
completed, either normally or by failure, a next
  * one is obtained thereby sending a Last-Event-ID header if available. This 
continues in an endless cycle.
  *
+ * This endless cycle means that if the connection is interrupted, 
reconnection attempts will continue forever. This can
+ * be problematic if, for example, the connection fails due to an oversized 
line or event being received from the
+ * server. By default, an oversized SSE line or event will cause pekko-http to 
fail the stream. If the stream fails,
+ * this connector will establish a new connection and attempt to continue 
processing using Last-Event-ID. Reconsuming
+ * the oversized payload will fail the stream again, causing an infinite retry 
loop. This infinite loop can look like
+ * the connector getting stuck at the same point in the stream. Since the 
opinionated design of this connector is to
+ * retry forever, the connection error will be logged but only at the INFO 
level. You can use optional pekko-http
+ * configuration settings to define alternate handling of oversized SSE lines 
and events instead of failing the stream.
+ * See: `pekko.http.sse.oversized-line-handling` and 
`oversized-event-handling`.
+ *
  * The shape of this processing stage is a source of server-sent events; to 
take effect it must be connected and run.
  * Progress (including termination) is controlled by the connected flow or 
sink, e.g. a retry delay can be implemented
  * by streaming the materialized values of the handler via a throttle.
@@ -99,6 +110,7 @@ object EventSource {
     import EventStreamUnmarshalling.fromEventsStream
     implicit val actorSystem: ActorSystem = system.classicSystem
     import actorSystem.dispatcher
+    val log = Logging(actorSystem, 
"org.apache.pekko.stream.connectors.sse.EventSource")
 
     val continuousEvents = {
       def getEventSource(lastEventId: Option[String]) = {
@@ -111,7 +123,14 @@ object EventSource {
           .flatMap(Unmarshal(_).to[EventSource])
           .fallbackTo(Future.successful(noEvents))
       }
-      def recover(eventSource: EventSource) = 
eventSource.recoverWithRetries(1, { case _ => noEvents })
+      def recover(eventSource: EventSource) = eventSource.recoverWithRetries(1,
+        {
+          case e =>
+            log.info(
+              "SSE Connector is retrying failed stream for: {} Error was: {} 
with message: {}",
+              uri, e.getClass.getName, e.getMessage)
+            noEvents
+        })
       def delimit(eventSource: EventSource) = 
eventSource.concat(singleDelimiter)
       Flow[Option[String]]
         .mapAsync(1)(getEventSource)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to