This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 49087fa4fb763339a40cffe592fbcc774765b1ba Author: Neng Lu <[email protected]> AuthorDate: Sun Jun 12 18:05:25 2022 -0700 [Function] provide default error handler for function log appender (#15728) (cherry picked from commit f7635ec6d99bd5a13a31c7e9f17640746afec43c) --- .../apache/pulsar/functions/instance/LogAppender.java | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java index 5250e4c3cef..956717975e5 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.core.Appender; import org.apache.logging.log4j.core.ErrorHandler; import org.apache.logging.log4j.core.Layout; import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.DefaultErrorHandler; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; @@ -35,6 +36,11 @@ import java.util.concurrent.TimeUnit; * to a log topic. */ public class LogAppender implements Appender { + + private static final String LOG_LEVEL = "loglevel"; + private static final String INSTANCE = "instance"; + private static final String FQN = "fqn"; + private PulsarClient pulsarClient; private String logTopic; private String fqn; @@ -48,15 +54,16 @@ public class LogAppender implements Appender { this.logTopic = logTopic; this.fqn = fqn; this.instance = instance; + this.errorHandler = new DefaultErrorHandler(this); } @Override public void append(LogEvent logEvent) { producer.newMessage() .value(logEvent.getMessage().getFormattedMessage().getBytes(StandardCharsets.UTF_8)) - .property("loglevel", logEvent.getLevel().name()) - .property("instance", instance) - .property("fqn", fqn) + .property(LOG_LEVEL, logEvent.getLevel().name()) + .property(INSTANCE, instance) + .property(FQN, fqn) .sendAsync(); } @@ -82,6 +89,12 @@ public class LogAppender implements Appender { @Override public void setHandler(ErrorHandler errorHandler) { + if (errorHandler == null) { + throw new RuntimeException("The log error handler cannot be set to null"); + } + if (isStarted()) { + throw new RuntimeException("The log error handler cannot be changed once the appender is started"); + } this.errorHandler = errorHandler; }
