This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5a2f9ea0384f5e2e4b579443d157336853b450f9 Author: Michael Marshall <47911938+michaeljmarsh...@users.noreply.github.com> AuthorDate: Thu Apr 1 13:07:32 2021 -0600 [Tiered Storage] Prevent Class Loader Leak; Restore Offloader Directory Override (#9878) In Pulsar 2.7.0, there is a class loader leak. It looks like https://github.com/apache/pulsar/pull/8739 fixed the leak by only loading the offloader classes for the directory configured in `broker.conf`. However, the solution in https://github.com/apache/pulsar/pull/8739 ignores the fact that an offload policy can override the the offloaded directory. As such, there could be a regression in 2.7.1 if users are providing multiple offload directories. This PR returns the functionality without reintroducing the class loader leak. Update the `PulsarService` and the `PulsarConnectorCache` classes to use a map from directory strings to `Offloaders`. The new `Map` has keys of type `String`, but we could use keys of type `Path` and then normalize the paths to ensure that `./offloaders` and `offloaders` result in a single class loader. However, it looks like the `normalize` method in the path class has a warning about symbolic links. As such, I went with the basic `String` approach, which might lead to some duplication of loaded classes. Below is the javadoc for `normalize`, in case that helps for a design decision. ```java /** * Returns a path that is this path with redundant name elements eliminated. * * <p> The precise definition of this method is implementation dependent but * in general it derives from this path, a path that does not contain * <em>redundant</em> name elements. In many file systems, the "{@code .}" * and "{@code ..}" are special names used to indicate the current directory * and parent directory. In such file systems all occurrences of "{@code .}" * are considered redundant. If a "{@code ..}" is preceded by a * non-"{@code ..}" name then both names are considered redundant (the * process to identify such names is repeated until it is no longer * applicable). * * <p> This method does not access the file system; the path may not locate * a file that exists. Eliminating "{@code ..}" and a preceding name from a * path may result in the path that locates a different file than the original * path. This can arise when the preceding name is a symbolic link. * * @return the resulting path or this path if it does not contain * redundant name elements; an empty path is returned if this path * does have a root component and all name elements are redundant * * @see #getParent * @see #toRealPath */ Path normalize(); ``` This change is a code cleanup without any test coverage that should be covered by other tests. If required, I can create some tests. (cherry picked from commit 6c3ebbb01415cdfe094650ae0eeeea6dcc224e87) --- .../bookkeeper/mledger/offload/OffloaderUtils.java | 4 +- .../mledger/offload/OffloadersCache.java | 68 ++++++++++++++++++++++ .../mledger/offload/OffloadersCacheTest.java | 62 ++++++++++++++++++++ .../org/apache/pulsar/broker/PulsarService.java | 13 +++-- .../pulsar/sql/presto/PulsarConnectorCache.java | 11 ++-- 5 files changed, 144 insertions(+), 14 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java index 5243691..bc747d7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java @@ -114,8 +114,8 @@ public class OffloaderUtils { } } - public static Offloaders searchForOffloaders(String connectorsDirectory, String narExtractionDirectory) throws IOException { - Path path = Paths.get(connectorsDirectory).toAbsolutePath(); + public static Offloaders searchForOffloaders(String offloadersPath, String narExtractionDirectory) throws IOException { + Path path = Paths.get(offloadersPath).toAbsolutePath(); log.info("Searching for offloaders in {}", path); Offloaders offloaders = new Offloaders(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java new file mode 100644 index 0000000..e80c75b --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.offload; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Implementation of an Offloaders. The main purpose of this class is to + * ensure that an Offloaders directory is only loaded once. + */ +@Slf4j +public class OffloadersCache implements AutoCloseable { + + private Map<String, Offloaders> loadedOffloaders = new ConcurrentHashMap<>(); + + /** + * Method to load an Offloaders directory or to get an already loaded Offloaders directory. + * + * @param offloadersPath - the directory to search the offloaders nar files + * @param narExtractionDirectory - the directory to use for extraction + * @return the loaded offloaders class + * @throws IOException when fail to retrieve the pulsar offloader class + */ + public Offloaders getOrLoadOffloaders(String offloadersPath, String narExtractionDirectory) { + return loadedOffloaders.computeIfAbsent(offloadersPath, + (directory) -> { + try { + return OffloaderUtils.searchForOffloaders(directory, narExtractionDirectory); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public void close() { + loadedOffloaders.values().forEach(offloaders -> { + try { + offloaders.close(); + } catch (Exception e) { + log.error("Error while closing offloader.", e); + // Even if the offloader fails to close, the graceful shutdown process continues + } + }); + // Don't want to hold on to references to closed offloaders + loadedOffloaders.clear(); + } +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/offload/OffloadersCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/offload/OffloadersCacheTest.java new file mode 100644 index 0000000..1c2cd85 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/offload/OffloadersCacheTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.offload; + +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.testng.IObjectFactory; +import org.testng.annotations.ObjectFactory; +import org.testng.annotations.Test; + +import static org.mockito.ArgumentMatchers.eq; +import static org.testng.Assert.assertSame; + +@PrepareForTest({OffloaderUtils.class}) +@PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.pulsar.common.nar.*"}) +public class OffloadersCacheTest { + + // Necessary to make PowerMockito.mockStatic work with TestNG. + @ObjectFactory + public IObjectFactory getObjectFactory() { + return new org.powermock.modules.testng.PowerMockObjectFactory(); + } + + @Test + public void testLoadsOnlyOnce() throws Exception { + Offloaders expectedOffloaders = new Offloaders(); + + PowerMockito.mockStatic(OffloaderUtils.class); + PowerMockito.when(OffloaderUtils.searchForOffloaders(eq("./offloaders"), eq("/tmp"))) + .thenReturn(expectedOffloaders); + + OffloadersCache cache = new OffloadersCache(); + + // Call a first time to load the offloader + Offloaders offloaders1 = cache.getOrLoadOffloaders("./offloaders", "/tmp"); + + assertSame(offloaders1, expectedOffloaders, "The offloaders should be the mocked one."); + + // Call a second time to get the stored offlaoder + Offloaders offloaders2 = cache.getOrLoadOffloaders("./offloaders", "/tmp"); + + assertSame(offloaders2, expectedOffloaders, "The offloaders should be the mocked one."); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index fcabff2..ef8df6f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -66,9 +66,9 @@ import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; -import org.apache.bookkeeper.mledger.offload.OffloaderUtils; import org.apache.bookkeeper.mledger.offload.Offloaders; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.bookkeeper.mledger.offload.OffloadersCache; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; @@ -183,7 +183,7 @@ public class PulsarService implements AutoCloseable { private final ScheduledExecutorService loadManagerExecutor; private ScheduledExecutorService compactorExecutor; private OrderedScheduler offloaderScheduler; - private Offloaders offloaderManager = new Offloaders(); + private OffloadersCache offloadersCache = new OffloadersCache(); private LedgerOffloader defaultOffloader; private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = new ConcurrentHashMap<>(); private ScheduledFuture<?> loadReportTask = null; @@ -370,7 +370,7 @@ public class PulsarService implements AutoCloseable { schemaRegistryService.close(); } - offloaderManager.close(); + offloadersCache.close(); if (protocolHandlers != null) { protocolHandlers.close(); @@ -482,8 +482,6 @@ public class PulsarService implements AutoCloseable { schemaRegistryService = SchemaRegistryService.create( schemaStorage, config.getSchemaRegistryCompatibilityCheckers()); - this.offloaderManager = OffloaderUtils.searchForOffloaders( - config.getOffloadersDirectory(), config.getNarExtractionDirectory()); this.defaultOffloader = createManagedLedgerOffloader( OffloadPolicies.create(this.getConfiguration().getProperties())); this.brokerInterceptor = BrokerInterceptors.load(config); @@ -932,7 +930,10 @@ public class PulsarService implements AutoCloseable { checkNotNull(offloadPolicies.getOffloadersDirectory(), "Offloader driver is configured to be '%s' but no offloaders directory is configured.", offloadPolicies.getManagedLedgerOffloadDriver()); - LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory( + Offloaders offloaders = offloadersCache.getOrLoadOffloaders( + offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory()); + + LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory( offloadPolicies.getManagedLedgerOffloadDriver()); try { return offloaderFactory.create( diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java index c10312a..757995a 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -35,8 +35,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; -import org.apache.bookkeeper.mledger.offload.OffloaderUtils; import org.apache.bookkeeper.mledger.offload.Offloaders; +import org.apache.bookkeeper.mledger.offload.OffloadersCache; import org.apache.bookkeeper.stats.StatsProvider; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.PulsarVersion; @@ -57,7 +57,7 @@ public class PulsarConnectorCache { private final StatsProvider statsProvider; private OrderedScheduler offloaderScheduler; - private Offloaders offloaderManager; + private OffloadersCache offloadersCache = new OffloadersCache(); private LedgerOffloader defaultOffloader; private Map<NamespaceName, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>(); @@ -155,9 +155,9 @@ public class PulsarConnectorCache { checkNotNull(offloadPolicies.getOffloadersDirectory(), "Offloader driver is configured to be '%s' but no offloaders directory is configured.", offloadPolicies.getManagedLedgerOffloadDriver()); - this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory(), + Offloaders offloaders = offloadersCache.getOrLoadOffloaders(offloadPolicies.getOffloadersDirectory(), pulsarConnectorConfig.getNarExtractionDirectory()); - LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory( + LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory( offloadPolicies.getManagedLedgerOffloadDriver()); try { @@ -195,8 +195,7 @@ public class PulsarConnectorCache { instance.statsProvider.stop(); instance.managedLedgerFactory.shutdown(); instance.offloaderScheduler.shutdown(); - instance.offloaderManager.close(); - instance = null; + instance.offloadersCache.close(); } } }