This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0ea0e2bd0752796ad24f4d377edac3919290814a Author: Neng Lu <[email protected]> AuthorDate: Sun Jun 12 18:05:25 2022 -0700 [Function] provide default error handler for function log appender (#15728) (cherry picked from commit f7635ec6d99bd5a13a31c7e9f17640746afec43c) --- .../pulsar/functions/instance/LogAppender.java | 25 ++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java index bbca3f9efa1..46b90e54381 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java @@ -19,7 +19,12 @@ package org.apache.pulsar.functions.instance; import java.nio.charset.StandardCharsets; -import org.apache.logging.log4j.core.*; +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.ErrorHandler; +import org.apache.logging.log4j.core.Layout; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.DefaultErrorHandler; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; @@ -32,6 +37,11 @@ import java.util.concurrent.TimeUnit; * to a log topic. */ public class LogAppender implements Appender { + + private static final String LOG_LEVEL = "loglevel"; + private static final String INSTANCE = "instance"; + private static final String FQN = "fqn"; + private PulsarClient pulsarClient; private String logTopic; private String fqn; @@ -45,15 +55,16 @@ public class LogAppender implements Appender { this.logTopic = logTopic; this.fqn = fqn; this.instance = instance; + this.errorHandler = new DefaultErrorHandler(this); } @Override public void append(LogEvent logEvent) { producer.newMessage() .value(logEvent.getMessage().getFormattedMessage().getBytes(StandardCharsets.UTF_8)) - .property("loglevel", logEvent.getLevel().name()) - .property("instance", instance) - .property("fqn", fqn) + .property(LOG_LEVEL, logEvent.getLevel().name()) + .property(INSTANCE, instance) + .property(FQN, fqn) .sendAsync(); } @@ -79,6 +90,12 @@ public class LogAppender implements Appender { @Override public void setHandler(ErrorHandler errorHandler) { + if (errorHandler == null) { + throw new RuntimeException("The log error handler cannot be set to null"); + } + if (isStarted()) { + throw new RuntimeException("The log error handler cannot be changed once the appender is started"); + } this.errorHandler = errorHandler; }
