This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9d813d0e3f04f683afe95aef2937b24a52d6aacd Author: Gabriel Miklós <[email protected]> AuthorDate: Thu May 18 17:23:55 2023 +0200 [improve][fn] Use functions classloader in TopicSchema.newSchemaInstance() to fix ClassNotFoundException when using custom SerDe classes. (targeted for master) (#20115) --- .../org/apache/pulsar/functions/instance/ContextImpl.java | 2 +- .../java/org/apache/pulsar/functions/sink/PulsarSink.java | 2 +- .../org/apache/pulsar/functions/source/PulsarSource.java | 6 +++--- .../functions/source/SingleConsumerPulsarSource.java | 11 ++++------- .../org/apache/pulsar/functions/source/TopicSchema.java | 15 ++++++++++++--- .../apache/pulsar/functions/source/TopicSchemaTest.java | 8 +++++--- .../tests/integration/functions/PulsarFunctionsTest.java | 8 ++++---- .../integration/functions/PulsarFunctionsTestBase.java | 2 +- .../functions/java/PulsarFunctionsJavaTest.java | 11 +++++++---- .../functions/java/PulsarWorkerRebalanceDrainTest.java | 15 +++++++++------ .../integration/functions/utils/CommandGenerator.java | 10 +++++----- 11 files changed, 52 insertions(+), 38 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 914275eb163..8f4fef293bd 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -148,7 +148,7 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable this.clientBuilder = clientBuilder; this.client = client; this.pulsarAdmin = pulsarAdmin; - this.topicSchema = new TopicSchema(client); + this.topicSchema = new TopicSchema(client, Thread.currentThread().getContextClassLoader()); this.statsManager = statsManager; this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index cdd662bc75f..a3bc52c98c5 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -345,7 +345,7 @@ public class PulsarSink<T> implements Sink<T> { ComponentStatsManager stats, ClassLoader functionClassLoader) { this.client = client; this.pulsarSinkConfig = pulsarSinkConfig; - this.topicSchema = new TopicSchema(client); + this.topicSchema = new TopicSchema(client, functionClassLoader); this.properties = properties; this.stats = stats; this.functionClassLoader = functionClassLoader; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index 1fb76459e60..18b541ebe59 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -53,7 +53,7 @@ public abstract class PulsarSource<T> implements Source<T> { ClassLoader functionClassLoader) { this.pulsarClient = pulsarClient; this.pulsarSourceConfig = pulsarSourceConfig; - this.topicSchema = new TopicSchema(pulsarClient); + this.topicSchema = new TopicSchema(pulsarClient, functionClassLoader); this.properties = properties; this.functionClassLoader = functionClassLoader; } @@ -168,8 +168,8 @@ public abstract class PulsarSource<T> implements Source<T> { Class<?> typeArg) { PulsarSourceConsumerConfig.PulsarSourceConsumerConfigBuilder<T> consumerConfBuilder = PulsarSourceConsumerConfig.<T>builder().isRegexPattern(conf.isRegexPattern()) - .receiverQueueSize(conf.getReceiverQueueSize()) - .consumerProperties(conf.getConsumerProperties()); + .receiverQueueSize(conf.getReceiverQueueSize()) + .consumerProperties(conf.getConsumerProperties()); Schema<T> schema; if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java index 885277351b1..041f601ad2e 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java @@ -43,14 +43,12 @@ public class SingleConsumerPulsarSource<T> extends PulsarSource<T> { private Consumer<T> consumer; private final List<Consumer<T>> inputConsumers = new LinkedList<>(); - public SingleConsumerPulsarSource(PulsarClient pulsarClient, - SingleConsumerPulsarSourceConfig pulsarSourceConfig, - Map<String, String> properties, - ClassLoader functionClassLoader) { + public SingleConsumerPulsarSource(PulsarClient pulsarClient, SingleConsumerPulsarSourceConfig pulsarSourceConfig, + Map<String, String> properties, ClassLoader functionClassLoader) { super(pulsarClient, pulsarSourceConfig, properties, functionClassLoader); this.pulsarClient = pulsarClient; this.pulsarSourceConfig = pulsarSourceConfig; - this.topicSchema = new TopicSchema(pulsarClient); + this.topicSchema = new TopicSchema(pulsarClient, functionClassLoader); this.properties = properties; this.functionClassLoader = functionClassLoader; } @@ -59,8 +57,7 @@ public class SingleConsumerPulsarSource<T> extends PulsarSource<T> { public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception { log.info("Opening pulsar source with config: {}", pulsarSourceConfig); - Class<?> typeArg = Reflections.loadClass(this.pulsarSourceConfig.getTypeClassName(), - this.functionClassLoader); + Class<?> typeArg = Reflections.loadClass(this.pulsarSourceConfig.getTypeClassName(), this.functionClassLoader); checkArgument(!Void.class.equals(typeArg), "Input type of Pulsar Function cannot be Void"); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java index f3a1aaef949..0a4c4058fb4 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java @@ -19,7 +19,11 @@ package org.apache.pulsar.functions.source; import io.netty.buffer.ByteBuf; +import java.net.URL; +import java.net.URLClassLoader; import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -49,8 +53,13 @@ public class TopicSchema { private final Map<String, Schema<?>> cachedSchemas = new HashMap<>(); private final PulsarClient client; - public TopicSchema(PulsarClient client) { + private final ClassLoader functionsClassloader; + + public TopicSchema(PulsarClient client, ClassLoader functionsClassloader) { this.client = client; + this.functionsClassloader = AccessController.doPrivileged( + (PrivilegedAction<URLClassLoader>) () -> new URLClassLoader(new URL[0], functionsClassloader) + ); } /** @@ -244,11 +253,11 @@ public class TopicSchema { @SuppressWarnings("unchecked") private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String schemaTypeOrClassName, boolean input) { return newSchemaInstance(topic, clazz, new ConsumerConfig(schemaTypeOrClassName), input, - Thread.currentThread().getContextClassLoader()); + functionsClassloader); } @SuppressWarnings("unchecked") private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, ConsumerConfig conf, boolean input) { - return newSchemaInstance(topic, clazz, conf, input, Thread.currentThread().getContextClassLoader()); + return newSchemaInstance(topic, clazz, conf, input, functionsClassloader); } } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java index 26ae6c84893..64de4bcb43f 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.functions.source; -import static org.testng.Assert.assertEquals; -import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.schema.AvroSchema; @@ -30,12 +28,16 @@ import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.proto.Request; import org.testng.annotations.Test; +import java.util.Optional; + +import static org.testng.Assert.assertEquals; + @Slf4j public class TopicSchemaTest { @Test public void testGetSchema() { - TopicSchema topicSchema = new TopicSchema(null); + TopicSchema topicSchema = new TopicSchema(null, Thread.currentThread().getContextClassLoader()); String TOPIC = "public/default/test"; Schema<?> schema = topicSchema.getSchema(TOPIC + "1", DummyClass.class, Optional.of(SchemaType.JSON)); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 908d95784d6..a2ff21c7176 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -24,10 +24,10 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import io.swagger.util.Json; import java.time.Duration; import java.util.Collections; import java.util.HashSet; @@ -39,8 +39,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - -import io.swagger.util.Json; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -72,8 +70,8 @@ import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.api.examples.AutoSchemaFunction; import org.apache.pulsar.functions.api.examples.AvroSchemaTestFunction; -import org.apache.pulsar.functions.api.examples.MergeTopicFunction; import org.apache.pulsar.functions.api.examples.InitializableFunction; +import org.apache.pulsar.functions.api.examples.MergeTopicFunction; import org.apache.pulsar.functions.api.examples.RecordFunction; import org.apache.pulsar.functions.api.examples.pojo.AvroTestObject; import org.apache.pulsar.functions.api.examples.pojo.Users; @@ -883,6 +881,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { String functionName, String functionFile, String functionClass, + Map<String, String> inputSerdeClassNames, String outputSerdeClassName, Map<String, String> userConfigs) throws Exception { @@ -897,6 +896,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { } generator.setSinkTopic(outputTopicName); generator.setFunctionName(functionName); + generator.setCustomSerDeSourceTopics(inputSerdeClassNames); generator.setOutputSerDe(outputSerdeClassName); if (userConfigs != null) { generator.setUserConfig(userConfigs); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java index e80f811fd5e..0fe77c603b9 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java @@ -54,7 +54,7 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite { public static final String SERDE_JAVA_CLASS = "org.apache.pulsar.functions.api.examples.CustomBaseToBaseFunction"; - public static final String SERDE_OUTPUT_CLASS = + public static final String SERDE_CLASS = "org.apache.pulsar.functions.api.examples.CustomBaseSerde"; public static final String EXCLAMATION_PYTHON_CLASS = diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java index 7e1c6500e3c..1ba373b64d0 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java @@ -19,9 +19,9 @@ package org.apache.pulsar.tests.integration.functions.java; import static org.testng.Assert.assertEquals; - import java.util.Collections; - +import java.util.Map; +import org.apache.commons.collections4.map.HashedMap; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.policies.data.FunctionStatus; import org.apache.pulsar.common.policies.data.FunctionStatusUtil; @@ -70,10 +70,13 @@ public class PulsarFunctionsJavaTest extends PulsarFunctionsTest { admin.topics().createNonPartitionedTopic(outputTopicName); } + Map<String, String> inputTopicsSerde = new HashedMap<>(); + inputTopicsSerde.put(inputTopicName, SERDE_CLASS); + String functionName = "test-serde-fn-" + randomName(8); submitFunction( - Runtime.JAVA, inputTopicName, outputTopicName, functionName, null, SERDE_JAVA_CLASS, - SERDE_OUTPUT_CLASS, Collections.singletonMap("serde-topic", outputTopicName) + Runtime.JAVA, inputTopicName, outputTopicName, functionName, null, SERDE_JAVA_CLASS, inputTopicsSerde, + SERDE_CLASS, Collections.singletonMap("serde-topic", outputTopicName) ); // get function info diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarWorkerRebalanceDrainTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarWorkerRebalanceDrainTest.java index 4fd257e1c0c..5a724081b0d 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarWorkerRebalanceDrainTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarWorkerRebalanceDrainTest.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.tests.integration.functions.java; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import com.fasterxml.jackson.databind.MappingIterator; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -26,10 +29,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; - -import com.fasterxml.jackson.databind.MappingIterator; import lombok.extern.slf4j.Slf4j; import lombok.val; +import org.apache.commons.collections4.map.HashedMap; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.functions.WorkerInfo; import org.apache.pulsar.common.policies.data.FunctionStatus; @@ -41,8 +43,6 @@ import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runt import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.testng.annotations.Test; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; @Slf4j public class PulsarWorkerRebalanceDrainTest extends PulsarFunctionsTest { @@ -251,9 +251,12 @@ public class PulsarWorkerRebalanceDrainTest extends PulsarFunctionsTest { admin.topics().createNonPartitionedTopic(outputTopicName); } + Map<String, String> inputTopicsSerde = new HashedMap<>(); + inputTopicsSerde.put(inputTopicName, SERDE_CLASS); + submitFunction( - Runtime.JAVA, inputTopicName, outputTopicName, functionName, null, SERDE_JAVA_CLASS, - SERDE_OUTPUT_CLASS, Collections.singletonMap(topicPrefix, outputTopicName) + Runtime.JAVA, inputTopicName, outputTopicName, functionName, null, SERDE_JAVA_CLASS, inputTopicsSerde, + SERDE_CLASS, Collections.singletonMap(topicPrefix, outputTopicName) ); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java index 4edf4624c63..4f3078902ad 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java @@ -46,7 +46,7 @@ public class CommandGenerator { private String functionClassName; private String sourceTopic; private String sourceTopicPattern; - private Map<String, String> customSereSourceTopics; + private Map<String, String> customSerDeSourceTopics; private String sinkTopic; private String logTopic; private String outputSerDe; @@ -174,8 +174,8 @@ public class CommandGenerator { if (batchBuilder != null) { commandBuilder.append("--batch-builder" + batchBuilder); } - if (customSereSourceTopics != null && !customSereSourceTopics.isEmpty()) { - commandBuilder.append(" --customSerdeInputs \'" + new Gson().toJson(customSereSourceTopics) + "\'"); + if (customSerDeSourceTopics != null && !customSerDeSourceTopics.isEmpty()) { + commandBuilder.append(" --customSerdeInputs \'" + new Gson().toJson(customSerDeSourceTopics) + "\'"); } if (sinkTopic != null) { commandBuilder.append(" --output " + sinkTopic); @@ -266,8 +266,8 @@ public class CommandGenerator { if (StringUtils.isNotEmpty(sourceTopic)) { commandBuilder.append(" --inputs " + sourceTopic); } - if (customSereSourceTopics != null && !customSereSourceTopics.isEmpty()) { - commandBuilder.append(" --customSerdeInputs \'" + new Gson().toJson(customSereSourceTopics) + "\'"); + if (customSerDeSourceTopics != null && !customSerDeSourceTopics.isEmpty()) { + commandBuilder.append(" --customSerdeInputs \'" + new Gson().toJson(customSerDeSourceTopics) + "\'"); } if (batchBuilder != null) { commandBuilder.append("--batch-builder" + batchBuilder);
