This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 7415a252d8 Improve ConditionalLogger (#6159)
7415a252d8 is described below
commit 7415a252d8639e48cd24e593cc76406427a3cf79
Author: Christopher Tubbs <[email protected]>
AuthorDate: Fri Feb 27 15:58:14 2026 -0500
Improve ConditionalLogger (#6159)
* Use a slightly smaller/simpler implementation
* Do not rely on slf4j 2.0 APIs
* Do not extend AbstractLogger or rely on its implementation
* Replace BiFunction with BiPredicate
* Avoid serialization issue flagged by Java 21 compiler where
ConditionalLogger had a non-transient, non-serializable field
(condition), which made ConditionalLogger not serializable, while
extending AbstractLogger, which was serializable
---
.../accumulo/core/logging/ConditionalLogger.java | 167 +++++++++------------
.../spi/balancer/HostRegexTableLoadBalancer.java | 5 +-
.../core/logging/DeduplicatingLoggerTest.java | 2 +-
.../core/logging/EscalatingLoggerTest.java | 5 +-
.../coordinator/CompactionCoordinator.java | 13 +-
.../accumulo/manager/TabletGroupWatcher.java | 5 +-
.../org/apache/accumulo/tserver/tablet/Tablet.java | 2 +-
7 files changed, 88 insertions(+), 111 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/logging/ConditionalLogger.java
b/core/src/main/java/org/apache/accumulo/core/logging/ConditionalLogger.java
index 6da6454f06..d82221a099 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/ConditionalLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/ConditionalLogger.java
@@ -18,17 +18,16 @@
*/
package org.apache.accumulo.core.logging;
+import static java.util.Objects.requireNonNull;
+
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
-import java.util.function.BiFunction;
+import java.util.function.BiPredicate;
import org.apache.accumulo.core.util.Pair;
import org.slf4j.Logger;
-import org.slf4j.Marker;
-import org.slf4j.event.Level;
-import org.slf4j.helpers.AbstractLogger;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
@@ -37,9 +36,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
* Logger that wraps another Logger and only emits a log message once per the
supplied duration.
*
*/
-public abstract class ConditionalLogger extends AbstractLogger {
-
- private static final long serialVersionUID = 1L;
+public abstract class ConditionalLogger {
/**
* A Logger implementation that will log a message at the supplied elevated
level if it has not
@@ -49,30 +46,37 @@ public abstract class ConditionalLogger extends
AbstractLogger {
*/
public static class EscalatingLogger extends DeduplicatingLogger {
- private static final long serialVersionUID = 1L;
- private final Level elevatedLevel;
+ private final ConditionalLogAction elevatedLogAction;
public EscalatingLogger(Logger log, Duration threshold, long
maxCachedLogMessages,
- Level elevatedLevel) {
+ ConditionalLogAction elevatedLogAction) {
super(log, threshold, maxCachedLogMessages);
- this.elevatedLevel = elevatedLevel;
+ this.elevatedLogAction = requireNonNull(elevatedLogAction);
}
@Override
- protected void handleNormalizedLoggingCall(Level level, Marker marker,
String messagePattern,
- Object[] arguments, Throwable throwable) {
+ public void trace(String format, Object... arguments) {
+ log(elevatedLogAction, Logger::trace, format, arguments);
+ }
- if (arguments == null) {
- arguments = new Object[0];
- }
- if (!condition.apply(messagePattern, Arrays.asList(arguments))) {
-
delegate.atLevel(level).addMarker(marker).setCause(throwable).log(messagePattern,
- arguments);
- } else {
-
delegate.atLevel(elevatedLevel).addMarker(marker).setCause(throwable).log(messagePattern,
- arguments);
- }
+ @Override
+ public void debug(String format, Object... arguments) {
+ log(elevatedLogAction, Logger::debug, format, arguments);
+ }
+
+ @Override
+ public void info(String format, Object... arguments) {
+ log(elevatedLogAction, Logger::info, format, arguments);
+ }
+ @Override
+ public void warn(String format, Object... arguments) {
+ log(elevatedLogAction, Logger::warn, format, arguments);
+ }
+
+ @Override
+ public void error(String format, Object... arguments) {
+ log(elevatedLogAction, Logger::error, format, arguments);
}
}
@@ -82,10 +86,8 @@ public abstract class ConditionalLogger extends
AbstractLogger {
*/
public static class DeduplicatingLogger extends ConditionalLogger {
- private static final long serialVersionUID = 1L;
-
public DeduplicatingLogger(Logger log, Duration threshold, long
maxCachedLogMessages) {
- super(log, new BiFunction<>() {
+ super(log, new BiPredicate<>() {
private final Cache<Pair<String,List<Object>>,Boolean> cache =
Caffeine.newBuilder()
.expireAfterWrite(threshold).maximumSize(maxCachedLogMessages).build();
@@ -93,14 +95,20 @@ public abstract class ConditionalLogger extends
AbstractLogger {
private final ConcurrentMap<Pair<String,List<Object>>,Boolean>
cacheMap = cache.asMap();
/**
- * Function that will return true if the message has not been seen in
the supplied duration.
+ * Function that will return true if the message with the provided
arguments (minus any
+ * included Throwable as the last argument) has not been seen in the
supplied duration.
+ * Deduplication will only work if the arguments are of types that
implement meaningful
+ * equals. This is not generally true of Throwables.
*
* @param msg log message
* @param args log message arguments
* @return true if message has not been seen in duration, else false.
*/
@Override
- public Boolean apply(String msg, List<Object> args) {
+ public boolean test(String msg, List<Object> args) {
+ if (!args.isEmpty() && args.get(args.size() - 1) instanceof
Throwable) {
+ args = args.subList(0, args.size() - 1);
+ }
return cacheMap.putIfAbsent(new Pair<>(msg, args), true) == null;
}
@@ -110,85 +118,60 @@ public abstract class ConditionalLogger extends
AbstractLogger {
}
protected final Logger delegate;
- protected final BiFunction<String,List<Object>,Boolean> condition;
-
- protected ConditionalLogger(Logger log,
BiFunction<String,List<Object>,Boolean> condition) {
- // this.delegate = new DelegateWrapper(log);
- this.delegate = log;
- this.condition = condition;
- }
-
- @Override
- public boolean isTraceEnabled() {
- return this.delegate.isTraceEnabled();
- }
-
- @Override
- public boolean isTraceEnabled(Marker marker) {
- return this.delegate.isTraceEnabled(marker);
- }
-
- @Override
- public boolean isDebugEnabled() {
- return this.delegate.isDebugEnabled();
- }
-
- @Override
- public boolean isDebugEnabled(Marker marker) {
- return this.delegate.isDebugEnabled(marker);
- }
+ protected final BiPredicate<String,List<Object>> condition;
- @Override
- public boolean isInfoEnabled() {
- return this.delegate.isInfoEnabled();
+ protected ConditionalLogger(Logger log, BiPredicate<String,List<Object>>
condition) {
+ this.delegate = requireNonNull(log);
+ this.condition = requireNonNull(condition);
}
- @Override
- public boolean isInfoEnabled(Marker marker) {
- return this.delegate.isInfoEnabled(marker);
+ @FunctionalInterface
+ public interface ConditionalLogAction {
+ void log(Logger logger, String format, Object... arguments);
}
- @Override
- public boolean isWarnEnabled() {
- return this.delegate.isWarnEnabled();
- }
-
- @Override
- public boolean isWarnEnabled(Marker marker) {
- return this.delegate.isWarnEnabled(marker);
+ /**
+ * Conditionally executes the log action with the provided format string and
arguments
+ *
+ * @param conditionTrueLogAction the log action to execute (e.g.
Logger::warn, Logger::debug,
+ * etc.) when the condition is true (optional, may be null)
+ * @param conditionFalseLogAction the log action to execute (e.g.
Logger::warn, Logger::debug,
+ * etc.) when the condition is false (optional, may be null)
+ * @param format the message format String for the logger
+ * @param arguments the arguments to the format String
+ */
+ protected final void log(ConditionalLogAction conditionTrueLogAction,
+ ConditionalLogAction conditionFalseLogAction, String format, Object...
arguments) {
+ if (arguments == null) {
+ arguments = new Object[0];
+ }
+ if (condition.test(format, Arrays.asList(arguments))) {
+ if (conditionTrueLogAction != null) {
+ conditionTrueLogAction.log(delegate, format, arguments);
+ }
+ } else if (conditionFalseLogAction != null) {
+ conditionFalseLogAction.log(delegate, format, arguments);
+ }
}
- @Override
- public boolean isErrorEnabled() {
- return this.delegate.isErrorEnabled();
+ public void trace(String format, Object... arguments) {
+ log(Logger::trace, null, format, arguments);
}
- @Override
- public boolean isErrorEnabled(Marker marker) {
- return this.delegate.isErrorEnabled(marker);
+ public void debug(String format, Object... arguments) {
+ log(Logger::debug, null, format, arguments);
}
- @Override
- public String getName() {
- return this.delegate.getName();
+ public void info(String format, Object... arguments) {
+ log(Logger::info, null, format, arguments);
}
- @Override
- protected String getFullyQualifiedCallerName() {
- return this.delegate.getName();
+ public void warn(String format, Object... arguments) {
+ log(Logger::warn, null, format, arguments);
}
- @Override
- protected void handleNormalizedLoggingCall(Level level, Marker marker,
String messagePattern,
- Object[] arguments, Throwable throwable) {
-
- if (arguments == null) {
- arguments = new Object[0];
- }
- if (condition.apply(messagePattern, Arrays.asList(arguments))) {
-
delegate.atLevel(level).addMarker(marker).setCause(throwable).log(messagePattern,
arguments);
- }
-
+ public void error(String format, Object... arguments) {
+ log(Logger::error, null, format, arguments);
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
index 4637d8c610..49309dc352 100644
---
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
+++
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
@@ -62,7 +62,6 @@ import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.event.Level;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
@@ -102,8 +101,8 @@ public class HostRegexTableLoadBalancer extends
TableLoadBalancer {
private static final String PROP_PREFIX =
Property.TABLE_ARBITRARY_PROP_PREFIX.getKey();
private static final Logger LOG =
LoggerFactory.getLogger(HostRegexTableLoadBalancer.class);
- private static final Logger MIGRATIONS_LOGGER =
- new EscalatingLogger(LOG, Duration.ofMinutes(5), 1000, Level.WARN);
+ private static final EscalatingLogger MIGRATIONS_LOGGER =
+ new EscalatingLogger(LOG, Duration.ofMinutes(5), 1000, Logger::warn);
public static final String HOST_BALANCER_PREFIX = PROP_PREFIX +
"balancer.host.regex.";
public static final String HOST_BALANCER_OOB_CHECK_KEY =
PROP_PREFIX + "balancer.host.regex.oob.period";
diff --git
a/core/src/test/java/org/apache/accumulo/core/logging/DeduplicatingLoggerTest.java
b/core/src/test/java/org/apache/accumulo/core/logging/DeduplicatingLoggerTest.java
index 826dfed144..e3fe83d7f9 100644
---
a/core/src/test/java/org/apache/accumulo/core/logging/DeduplicatingLoggerTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/logging/DeduplicatingLoggerTest.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
public class DeduplicatingLoggerTest {
private static final Logger LOG =
LoggerFactory.getLogger(DeduplicatingLoggerTest.class);
- private static final Logger TEST_LOGGER =
+ private static final DeduplicatingLogger TEST_LOGGER =
new DeduplicatingLogger(LOG, Duration.ofMinutes(1), 100);
@Test
diff --git
a/core/src/test/java/org/apache/accumulo/core/logging/EscalatingLoggerTest.java
b/core/src/test/java/org/apache/accumulo/core/logging/EscalatingLoggerTest.java
index cd368e38a4..a119601cf0 100644
---
a/core/src/test/java/org/apache/accumulo/core/logging/EscalatingLoggerTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/logging/EscalatingLoggerTest.java
@@ -33,13 +33,12 @@ import org.apache.logging.log4j.core.layout.PatternLayout;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.event.Level;
public class EscalatingLoggerTest {
private static final Logger LOG =
LoggerFactory.getLogger(EscalatingLoggerTest.class);
- private static final Logger TEST_LOGGER =
- new EscalatingLogger(LOG, Duration.ofSeconds(3), 100, Level.WARN);
+ private static final EscalatingLogger TEST_LOGGER =
+ new EscalatingLogger(LOG, Duration.ofSeconds(3), 100, Logger::warn);
@Test
public void test() throws InterruptedException {
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 387b1559b3..dcbdc0e2de 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -61,6 +61,7 @@ import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
import
org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.logging.ConditionalLogger.ConditionalLogAction;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
@@ -96,7 +97,6 @@ import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.event.Level;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
@@ -705,16 +705,13 @@ public class CompactionCoordinator extends AbstractServer
implements
for (var key : failureCounts.keySet()) {
failureCounts.compute(key, (k, counts) -> {
if (counts != null) {
- Level level;
+ ConditionalLogAction logAction = Logger::debug;
if (counts.failures > 0) {
- level = Level.WARN;
+ logAction = Logger::warn;
} else if (logSuccessAtTrace) {
- level = Level.TRACE;
- } else {
- level = Level.DEBUG;
+ logAction = Logger::trace;
}
-
- LOG.atLevel(level).log("{} {} failures:{} successes:{} since last
time this was logged ",
+ logAction.log(LOG, "{} {} failures:{} successes:{} since last time
this was logged ",
logPrefix, k, counts.failures, counts.successes);
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 52b398d98e..45f55169a6 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -106,7 +106,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.thrift.TException;
import org.slf4j.Logger;
-import org.slf4j.event.Level;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSortedSet;
@@ -115,8 +114,8 @@ import com.google.common.collect.Sets;
abstract class TabletGroupWatcher extends AccumuloDaemonThread {
- private static final Logger TABLET_UNLOAD_LOGGER =
- new EscalatingLogger(Manager.log, Duration.ofMinutes(5), 1000,
Level.INFO);
+ private static final EscalatingLogger TABLET_UNLOAD_LOGGER =
+ new EscalatingLogger(Manager.log, Duration.ofMinutes(5), 1000,
Logger::info);
private final Manager manager;
private final TabletStateStore store;
private final TabletGroupWatcher dependentWatcher;
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 76f5088dbb..1b2e5d6100 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -146,7 +146,7 @@ import io.opentelemetry.context.Scope;
*/
public class Tablet extends TabletBase {
private static final Logger log = LoggerFactory.getLogger(Tablet.class);
- private static final Logger CLOSING_STUCK_LOGGER =
+ private static final DeduplicatingLogger CLOSING_STUCK_LOGGER =
new DeduplicatingLogger(log, Duration.ofMinutes(5), 1000);
private final TabletServer tabletServer;