This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new d548313a4b Add fine-grained stream error logging control (#2805)
d548313a4b is described below
commit d548313a4b31196def8084fe9d88aa191a88dcfb
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Mar 30 20:16:36 2026 +0800
Add fine-grained stream error logging control (#2805)
Add configurable stream stage error log level via
'pekko.stream.materializer.stage-errors-default-log-level' config key.
Supports 'error' (default), 'warning', 'info', 'debug', or 'off'.
When a stage-specific LogLevels attribute is present, it takes
precedence. Otherwise the system-wide default is used, enabling
operators to reduce noise from expected stream errors without
per-stage configuration.
Changes:
- Add stage-errors-default-log-level to reference.conf
- Add LogLevels.defaultErrorLevel and LogLevels.fromString helpers
- Update GraphInterpreter.reportStageError to use per-level logging
- Update RestartFlow.loggingEnabled to respect system-wide setting
- Change NoMaterializer from object to case class accepting ActorSystem
(required because GraphInterpreter now accesses materializer.system)
Upstream: akka/akka-core@519d33d897
Cherry-picked from akka/akka-core v2.8.0, which is now Apache licensed.
Co-authored-by: Copilot <[email protected]>
---
.../impl/fusing/GraphInterpreterSpecKit.scala | 7 ++-----
stream/src/main/resources/reference.conf | 5 +++++
.../scala/org/apache/pekko/stream/Attributes.scala | 19 +++++++++++++++++++
.../stream/impl/fusing/GraphInterpreter.scala | 22 +++++++++++++++++-----
.../apache/pekko/stream/scaladsl/RestartFlow.scala | 4 +++-
5 files changed, 46 insertions(+), 11 deletions(-)
diff --git
a/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala
b/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala
index 46bf05c5ac..bfe8d92599 100644
---
a/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala
+++
b/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala
@@ -40,7 +40,7 @@ import pekko.stream.testkit.Utils.TE
* INTERNAL API
*/
@InternalApi
-private[pekko] object NoMaterializer extends Materializer {
+private[pekko] case class NoMaterializer(override val system: ActorSystem)
extends Materializer {
override def withNamePrefix(name: String): Materializer =
throw new UnsupportedOperationException("NoMaterializer cannot be named")
override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat =
@@ -70,9 +70,6 @@ private[pekko] object NoMaterializer extends Materializer {
override def isShutdown: Boolean = throw new
UnsupportedOperationException("NoMaterializer cannot shutdown")
- override def system: ActorSystem =
- throw new UnsupportedOperationException("NoMaterializer does not have an
actorsystem")
-
override private[pekko] def logger = throw new
UnsupportedOperationException("NoMaterializer does not have a logger")
override private[pekko] def supervisor =
@@ -336,7 +333,7 @@ trait GraphInterpreterSpecKit extends StreamSpec {
}
_interpreter = new GraphInterpreter(
- NoMaterializer,
+ NoMaterializer(system),
logger,
logics,
connections,
diff --git a/stream/src/main/resources/reference.conf
b/stream/src/main/resources/reference.conf
index e2ba88232c..bba71993ec 100644
--- a/stream/src/main/resources/reference.conf
+++ b/stream/src/main/resources/reference.conf
@@ -54,6 +54,11 @@ pekko {
# Enable additional troubleshooting logging at DEBUG log level
debug-logging = off
+ # Log any stream stage error at the specified log level: "error",
"warning", "info", "debug" or "off".
+ # If there is a `pekko.stream.Attributes.LogLevels` attribute defined
for a specific stream this value is ignored
+ # and the `onFailure` value of the attribute is applied instead.
+ stage-errors-default-log-level = error
+
# Maximum number of elements emitted in batch if downstream signals
large demand
output-burst-limit = 1000
diff --git a/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala
b/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala
index 838eedc52b..4f6d346c31 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala
@@ -26,6 +26,7 @@ import scala.reflect.{ classTag, ClassTag }
import scala.util.control.NonFatal
import org.apache.pekko
+import pekko.actor.ActorSystem
import pekko.annotation.ApiMayChange
import pekko.annotation.DoNotInherit
import pekko.annotation.InternalApi
@@ -33,6 +34,7 @@ import pekko.event.Logging
import pekko.japi.function
import pekko.stream.impl.TraversalBuilder
import pekko.util.ByteString
+import pekko.util.Helpers
import pekko.util.LineNumbers
/**
@@ -720,6 +722,23 @@ object Attributes {
/** Use to enable logging at DEBUG level for certain operations when
configuring [[Attributes#logLevels]] */
final val Debug: Logging.LogLevel = Logging.DebugLevel
+
+ /** INTERNAL API */
+ @InternalApi
+ private[pekko] def defaultErrorLevel(system: ActorSystem):
Logging.LogLevel =
+
fromString(system.settings.config.getString("pekko.stream.materializer.stage-errors-default-log-level"))
+
+ /** INTERNAL API */
+ @InternalApi
+ private[pekko] def fromString(str: String): Logging.LogLevel = {
+ Helpers.toRootLowerCase(str) match {
+ case "off" => Off
+ case "error" => Error
+ case "warning" => Warning
+ case "info" => Info
+ case "debug" => Debug
+ }
+ }
}
/** Java API: Use to disable logging on certain operations when configuring
[[Attributes#createLogLevels]] */
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
index 43a05d946b..df3a36a39e 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
@@ -22,6 +22,7 @@ import org.apache.pekko
import pekko.Done
import pekko.actor.ActorRef
import pekko.annotation.{ InternalApi, InternalStableApi }
+import pekko.event.Logging
import pekko.event.LoggingAdapter
import pekko.stream._
import pekko.stream.Attributes.LogLevels
@@ -244,6 +245,8 @@ import pekko.stream.stage._
private[this] val finalizedMark = Array.fill(logics.length)(false)
private[this] var _subFusingMaterializer: Materializer = _
+ private[this] lazy val defaultErrorReportingLogLevel =
LogLevels.defaultErrorLevel(materializer.system)
+
def subFusingMaterializer: Materializer = _subFusingMaterializer
// An event queue implemented as a circular buffer
@@ -378,12 +381,21 @@ import pekko.stream.stage._
def reportStageError(e: Throwable): Unit = {
if (activeStage eq null) throw e
else {
- val loggingEnabled = activeStage.attributes.get[LogLevels] match {
- case Some(levels) => levels.onFailure != LogLevels.Off
- case None => true
+ val logAt: Logging.LogLevel =
activeStage.attributes.get[LogLevels] match {
+ case Some(levels) => levels.onFailure
+ case None => defaultErrorReportingLogLevel
+ }
+ logAt match {
+ case Logging.ErrorLevel =>
+ log.error(e, "Error in stage [{}]: {}", activeStage.toString,
e.getMessage)
+ case Logging.WarningLevel =>
+ log.warning(e, "Error in stage [{}]: {}",
activeStage.toString, e.getMessage)
+ case Logging.InfoLevel =>
+ log.info("Error in stage [{}]: {}", activeStage.toString,
e.getMessage)
+ case Logging.DebugLevel =>
+ log.debug("Error in stage [{}]: {}", activeStage.toString,
e.getMessage)
+ case _ => // Off, nop
}
- if (loggingEnabled)
- log.error(e, "Error in stage [{}]: {}", activeStage.toString,
e.getMessage)
activeStage.failStage(e)
// Abort chasing
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala
index 8a90aa3212..65f2ff9a4c 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala
@@ -167,7 +167,9 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
private def loggingEnabled = inheritedAttributes.get[LogLevels] match {
case Some(levels) => levels.onFailure != LogLevels.Off
- case None => true
+ case None =>
+ // Allows for system wide disable at least
+ LogLevels.defaultErrorLevel(materializer.system) != LogLevels.Off
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]