This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 33c9fb77a8391e2a68cccb8d821f66c76f071851 Author: Boyang Jerry Peng <[email protected]> AuthorDate: Mon Aug 9 19:01:42 2021 -0700 Fix: Cast exception occurs if function/source/sink type is ByteBuffer (#11611) Co-authored-by: Jerry Peng <[email protected]> (cherry picked from commit b7e027b7290eef2f2daf22dde57395ede1c00985) --- .../worker/PulsarFunctionLocalRunTest.java | 51 +++++++++++++++++++--- .../pulsar/functions/source/TopicSchema.java | 6 ++- 2 files changed, 51 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java index 9477af4..232f7dc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java @@ -37,6 +37,7 @@ import java.lang.reflect.Method; import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.util.Arrays; import java.util.Collections; @@ -74,19 +75,20 @@ import org.apache.pulsar.common.io.SinkConfig; import org.apache.pulsar.common.io.SourceConfig; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.policies.data.PublisherStats; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.LocalRunner; +import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig; import org.apache.pulsar.functions.utils.FunctionCommon; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -851,7 +853,7 @@ public class PulsarFunctionLocalRunTest { runWithNarClassLoader(() -> testPulsarSourceLocalRun(null, 2)); } - private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism) throws Exception { + private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism, String className) throws Exception { final String namespacePortion = "io"; final String replNamespace = tenant + "/" + namespacePortion; final String sourceTopic = "persistent://" + replNamespace + "/input"; @@ -869,7 +871,9 @@ public class PulsarFunctionLocalRunTest { SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, sinkName, sourceTopic, subscriptionName); sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().receiverQueueSize(1000).build())); - if (jarFilePathUrl == null || !jarFilePathUrl.endsWith(".nar")) { + if (className != null) { + sinkConfig.setClassName(className); + } else if (jarFilePathUrl == null || !jarFilePathUrl.endsWith(".nar")) { sinkConfig.setClassName("org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink"); } @@ -935,6 +939,14 @@ public class PulsarFunctionLocalRunTest { if (m != null) { metricsMap.put(m.tags.get("instance_id"), m); } + } else if (line.startsWith("pulsar_sink_sink_exceptions_total")) { + Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(line); + assertFalse(metrics.isEmpty()); + PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_sink_exceptions_total"); + if (m == null) { + m = metrics.get("pulsar_sink_sink_exceptions_total_1min"); + } + assertEquals(m.value, 0); } }); Assert.assertEquals(metricsMap.size(), parallelism); @@ -972,7 +984,11 @@ public class PulsarFunctionLocalRunTest { } private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception { - testPulsarSourceLocalRun(jarFilePathUrl, 1); + testPulsarSinkLocalRun(jarFilePathUrl, 1); + } + + private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism) throws Exception { + testPulsarSinkLocalRun(jarFilePathUrl, parallelism, null); } @Test(timeOut = 20000, groups = "builtin") @@ -1001,6 +1017,31 @@ public class PulsarFunctionLocalRunTest { runWithNarClassLoader(() -> testPulsarSinkLocalRun(null, 2)); } + public static class StatsNullSink implements Sink<ByteBuffer> { + volatile long bytesTotal = 0; + + @Override + public void open(Map map, final SinkContext sinkContext) throws Exception { + + } + + @Override + public void write(Record<ByteBuffer> record) throws Exception { + bytesTotal += record.getValue().capacity(); + record.ack(); + } + + @Override + public void close() throws Exception { + + } + } + + @Test + public void test() throws Throwable{ + runWithNarClassLoader(() -> testPulsarSinkLocalRun(null, 1, StatsNullSink.class.getName())); + } + private void runWithNarClassLoader(Assert.ThrowingRunnable throwingRunnable) throws Throwable { ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); try (NarClassLoader classLoader = NarClassLoader.getFromArchive(getPulsarIODataGeneratorNar(), Collections.emptySet(), originalClassLoader, NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR)) { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java index caf7ff8..dcd424e 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java @@ -145,7 +145,11 @@ public class TopicSchema { private static <T> Schema<T> newSchemaInstance(Class<T> clazz, SchemaType type, ConsumerConfig conf) { switch (type) { case NONE: - return (Schema<T>) Schema.BYTES; + if (ByteBuffer.class.isAssignableFrom(clazz)) { + return (Schema<T>) Schema.BYTEBUFFER; + } else { + return (Schema<T>) Schema.BYTES; + } case AUTO_CONSUME: case AUTO:
