This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 53167ff6b03 [fix][broker] EntryFilters fix NoClassDefFoundError due to
closed classloader (#22767)
53167ff6b03 is described below
commit 53167ff6b039d6ff69cae43501ee66983e2920e4
Author: Enrico Olivelli <[email protected]>
AuthorDate: Wed May 29 17:27:00 2024 +0200
[fix][broker] EntryFilters fix NoClassDefFoundError due to closed
classloader (#22767)
---
.../broker/service/plugin/EntryFilterProvider.java | 3 ++-
.../service/plugin/EntryFilterWithClassLoader.java | 29 +++++++++++++++++-----
.../broker/service/plugin/FilterEntryTest.java | 12 ++++-----
.../pulsar/broker/stats/ConsumerStatsTest.java | 2 +-
.../pulsar/broker/stats/SubscriptionStatsTest.java | 2 +-
.../apache/pulsar/common/nar/NarClassLoader.java | 16 ++++++++++++
6 files changed, 49 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
index f93e561542e..53418744b54 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
@@ -197,7 +197,8 @@ public class EntryFilterProvider implements AutoCloseable {
+ " does not implement entry filter interface");
}
EntryFilter pi = (EntryFilter) filter;
- return new EntryFilterWithClassLoader(pi, ncl);
+ // the classloader is shared with the broker, the instance doesn't
own it
+ return new EntryFilterWithClassLoader(pi, ncl, false);
} catch (Throwable e) {
if (e instanceof IOException) {
throw (IOException) e;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java
index c5c57210877..aab46c62acd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java
@@ -30,15 +30,23 @@ import org.apache.pulsar.common.nar.NarClassLoader;
public class EntryFilterWithClassLoader implements EntryFilter {
private final EntryFilter entryFilter;
private final NarClassLoader classLoader;
+ private final boolean classLoaderOwned;
- public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader
classLoader) {
+ public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader
classLoader, boolean classLoaderOwned) {
this.entryFilter = entryFilter;
this.classLoader = classLoader;
+ this.classLoaderOwned = classLoaderOwned;
}
@Override
public FilterResult filterEntry(Entry entry, FilterContext context) {
- return entryFilter.filterEntry(entry, context);
+ ClassLoader currentClassLoader =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(classLoader);
+ return entryFilter.filterEntry(entry, context);
+ } finally {
+ Thread.currentThread().setContextClassLoader(currentClassLoader);
+ }
}
@VisibleForTesting
@@ -48,11 +56,20 @@ public class EntryFilterWithClassLoader implements
EntryFilter {
@Override
public void close() {
- entryFilter.close();
+ ClassLoader currentClassLoader =
Thread.currentThread().getContextClassLoader();
try {
- classLoader.close();
- } catch (IOException e) {
- log.error("close EntryFilterWithClassLoader failed", e);
+ Thread.currentThread().setContextClassLoader(classLoader);
+ entryFilter.close();
+ } finally {
+ Thread.currentThread().setContextClassLoader(currentClassLoader);
+ }
+ if (classLoaderOwned) {
+ log.info("Closing classloader {} for EntryFilter {}", classLoader,
entryFilter.getClass().getName());
+ try {
+ classLoader.close();
+ } catch (IOException e) {
+ log.error("close EntryFilterWithClassLoader failed", e);
+ }
}
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index 7b3daddcd9d..f7388ef9eb9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -239,9 +239,9 @@ public class FilterEntryTest extends BrokerTestBase {
hasFilterField.setAccessible(true);
NarClassLoader narClassLoader = mock(NarClassLoader.class);
EntryFilter filter1 = new EntryFilterTest();
- EntryFilterWithClassLoader loader1 =
spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class,
filter1, narClassLoader);
+ EntryFilterWithClassLoader loader1 =
spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class,
filter1, narClassLoader, false);
EntryFilter filter2 = new EntryFilter2Test();
- EntryFilterWithClassLoader loader2 =
spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class,
filter2, narClassLoader);
+ EntryFilterWithClassLoader loader2 =
spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class,
filter2, narClassLoader, false);
field.set(dispatcher, List.of(loader1, loader2));
hasFilterField.set(dispatcher, true);
@@ -371,9 +371,9 @@ public class FilterEntryTest extends BrokerTestBase {
hasFilterField.setAccessible(true);
NarClassLoader narClassLoader = mock(NarClassLoader.class);
EntryFilter filter1 = new EntryFilterTest();
- EntryFilterWithClassLoader loader1 =
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1,
narClassLoader);
+ EntryFilterWithClassLoader loader1 =
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1,
narClassLoader, false);
EntryFilter filter2 = new EntryFilter2Test();
- EntryFilterWithClassLoader loader2 =
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2,
narClassLoader);
+ EntryFilterWithClassLoader loader2 =
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2,
narClassLoader, false);
field.set(dispatcher, List.of(loader1, loader2));
hasFilterField.set(dispatcher, true);
@@ -463,10 +463,10 @@ public class FilterEntryTest extends BrokerTestBase {
NarClassLoader narClassLoader = mock(NarClassLoader.class);
EntryFilter filter1 = new EntryFilterTest();
EntryFilterWithClassLoader loader1 =
-
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1,
narClassLoader);
+
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1,
narClassLoader, false);
EntryFilter filter2 = new EntryFilterTest();
EntryFilterWithClassLoader loader2 =
-
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2,
narClassLoader);
+
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2,
narClassLoader, false);
field.set(dispatcher, List.of(loader1, loader2));
hasFilterField.set(dispatcher, true);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index de65d5db564..f9756c38305 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -407,7 +407,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase
{
EntryFilter filter = new EntryFilterProducerTest();
EntryFilterWithClassLoader
loader =
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter,
- narClassLoader);
+ narClassLoader, false);
Pair<String, List<EntryFilter>> entryFilters = Pair.of("filter",
List.of(loader));
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
index 83e6f43cbaf..910f1e6d68f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
@@ -206,7 +206,7 @@ public class SubscriptionStatsTest extends
ProducerConsumerBase {
NarClassLoader narClassLoader = mock(NarClassLoader.class);
EntryFilter filter1 = new EntryFilterTest();
EntryFilterWithClassLoader loader1 =
-
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1,
narClassLoader);
+
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1,
narClassLoader, false);
field.set(dispatcher, List.of(loader1));
hasFilterField.set(dispatcher, true);
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
index 9736d8b47ef..44cfc2872ef 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
@@ -40,6 +40,7 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -135,6 +136,7 @@ public class NarClassLoader extends URLClassLoader {
* The NAR for which this <tt>ClassLoader</tt> is responsible.
*/
private final File narWorkingDirectory;
+ private final AtomicBoolean closed = new AtomicBoolean();
private static final String TMP_DIR_PREFIX = "pulsar-nar";
@@ -292,4 +294,18 @@ public class NarClassLoader extends URLClassLoader {
public String toString() {
return NarClassLoader.class.getName() + "[" +
narWorkingDirectory.getPath() + "]";
}
+
+ @Override
+ protected Class<?> loadClass(String name, boolean resolve) throws
ClassNotFoundException {
+ if (closed.get()) {
+ log.warn("Loading class {} from a closed classloader ({})", name,
this);
+ }
+ return super.loadClass(name, resolve);
+ }
+
+ @Override
+ public void close() throws IOException {
+ closed.set(true);
+ super.close();
+ }
}