This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 314f3fb Functions API compatibility patch for 2.0 (#1777) 314f3fb is described below commit 314f3fbb5103c6fa052aec84e8006ad2d708e500 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Mon May 14 15:37:39 2018 -0700 Functions API compatibility patch for 2.0 (#1777) * Functions API compatibility patch for 2.0 * remove comment * adding missing header --- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 130 ++++++++++++--------- .../org/apache/pulsar/functions/api/Context.java | 6 +- .../pulsar/functions/instance/ContextImpl.java | 20 +++- .../functions/instance/JavaInstanceRunnable.java | 26 ++--- .../instance/processors/MessageProcessorBase.java | 9 +- .../instance/src/main/python/Function_pb2.py | 125 +++++++------------- .../instance/src/main/python/contextimpl.py | 10 +- .../src/main/python/python_instance_main.py | 4 +- .../functions/api/examples/PublishFunction.java | 2 +- .../functions/api/examples/UserConfigFunction.java | 4 +- .../proto/src/main/proto/Function.proto | 4 +- .../pulsar/functions/runtime/JavaInstanceMain.java | 10 +- .../pulsar/functions/runtime/ProcessRuntime.java | 4 +- .../pulsar/functions/utils/FunctionConfig.java | 2 +- 14 files changed, 174 insertions(+), 182 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index df15fc9..dd1da6f 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -18,28 +18,19 @@ */ package org.apache.pulsar.admin.cli; -import static com.google.common.base.Preconditions.checkNotNull; -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.Objects.isNull; -import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.lang.reflect.Type; -import java.net.MalformedURLException; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.stream.IntStream; - +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.beust.jcommander.converters.StringConverter; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonParser; +import com.google.gson.reflect.TypeToken; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import net.jodah.typetools.TypeResolver; import org.apache.bookkeeper.api.StorageClient; import org.apache.bookkeeper.api.kv.Table; import org.apache.bookkeeper.api.kv.result.KeyValue; @@ -52,9 +43,6 @@ import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.api.Function; -import org.apache.pulsar.functions.sink.PulsarSink; -import org.apache.pulsar.functions.source.PulsarSource; -import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.instance.InstanceConfig; @@ -63,27 +51,31 @@ import org.apache.pulsar.functions.runtime.RuntimeSpawner; import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf; import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil; import org.apache.pulsar.functions.shaded.io.netty.buffer.Unpooled; +import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees; import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec; import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec; -import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails; import org.apache.pulsar.functions.shaded.proto.Function.SubscriptionType; -import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees; -import org.apache.pulsar.functions.utils.FunctionDetailsUtils; +import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.Utils; -import com.beust.jcommander.Parameter; -import com.beust.jcommander.Parameters; -import com.beust.jcommander.converters.StringConverter; -import com.google.common.annotations.VisibleForTesting; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonParser; -import com.google.gson.reflect.TypeToken; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Type; +import java.net.MalformedURLException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.UUID; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import net.jodah.typetools.TypeResolver; +import static com.google.common.base.Preconditions.checkNotNull; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.isNull; +import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; @Slf4j @Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style compute processes that work with Pulsar)") @@ -285,7 +277,7 @@ public class CmdFunctions extends CmdBase { } if (null != userConfigString) { Type type = new TypeToken<Map<String, String>>(){}.getType(); - Map<String, String> userConfigMap = new Gson().fromJson(userConfigString, type); + Map<String, Object> userConfigMap = new Gson().fromJson(userConfigString, type); functionConfig.setUserConfig(userConfigMap); } if (null != jarFile) { @@ -330,13 +322,7 @@ public class CmdFunctions extends CmdBase { inferMissingArguments(functionConfig); } - private void doJavaSubmitChecks(FunctionConfig functionConfig) { - if (isNull(functionConfig.getClassName())) { - throw new IllegalArgumentException("You supplied a jar file but no main class"); - } - - File file = new File(jarFile); - // check if the function class exists in Jar and it implements Function class + private void assertClassExistsInJar(File file) { if (!Reflections.classExistsInJar(file, functionConfig.getClassName())) { throw new IllegalArgumentException(String.format("Pulsar function class %s does not exist in jar %s", functionConfig.getClassName(), jarFile)); @@ -345,16 +331,14 @@ public class CmdFunctions extends CmdBase { throw new IllegalArgumentException(String.format("The Pulsar function class %s in jar %s implements neither org.apache.pulsar.functions.api.Function nor java.util.function.Function", functionConfig.getClassName(), jarFile)); } + } - ClassLoader userJarLoader; - try { - userJarLoader = Reflections.loadJar(file); - } catch (MalformedURLException e) { - throw new RuntimeException("Failed to load user jar " + file, e); - } + private Class<?>[] getFunctionTypes(File file, FunctionConfig functionConfig) { + assertClassExistsInJar(file); Object userClass = Reflections.createInstance(functionConfig.getClassName(), file); Class<?>[] typeArgs; + if (userClass instanceof Function) { Function pulsarFunction = (Function) userClass; if (pulsarFunction == null) { @@ -370,6 +354,22 @@ public class CmdFunctions extends CmdBase { } typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass()); } + return typeArgs; + } + + private void doJavaSubmitChecks(FunctionConfig functionConfig) { + if (isNull(functionConfig.getClassName())) { + throw new IllegalArgumentException("You supplied a jar file but no main class"); + } + + File file = new File(jarFile); + ClassLoader userJarLoader; + try { + userJarLoader = Reflections.loadJar(file); + } catch (MalformedURLException e) { + throw new RuntimeException("Failed to load user jar " + file, e); + } + Class<?>[] typeArgs = getFunctionTypes(file, functionConfig); // Check if the Input serialization/deserialization class exists in jar or already loaded and that it // implements SerDe class @@ -538,34 +538,47 @@ public class CmdFunctions extends CmdBase { protected FunctionDetails convert(FunctionConfig functionConfig) throws IOException { + + Class<?>[] typeArgs = null; + if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { + + File file = new File(jarFile); + try { + Reflections.loadJar(file); + } catch (MalformedURLException e) { + throw new RuntimeException("Failed to load user jar " + file, e); + } + typeArgs = getFunctionTypes(file, functionConfig); + } + FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); // Setup source Map<String, String> topicToSerDeClassNameMap = new HashMap<>(); topicToSerDeClassNameMap.putAll(functionConfig.getCustomSerdeInputs()); SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); - if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { - sourceSpecBuilder.setClassName(PulsarSource.class.getName()); - } functionConfig.getInputs().forEach(v -> topicToSerDeClassNameMap.put(v, "")); sourceSpecBuilder.putAllTopicsToSerDeClassName(topicToSerDeClassNameMap); if (functionConfig.getSubscriptionType() != null) { sourceSpecBuilder .setSubscriptionType(convertSubscriptionType(functionConfig.getSubscriptionType())); } + if (typeArgs != null) { + sourceSpecBuilder.setTypeClassName(typeArgs[0].getName()); + } functionDetailsBuilder.setSource(sourceSpecBuilder); // Setup sink SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder(); - if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { - sinkSpecBuilder.setClassName(PulsarSink.class.getName()); - } if (functionConfig.getOutput() != null) { sinkSpecBuilder.setTopic(functionConfig.getOutput()); } if (functionConfig.getOutputSerdeClassName() != null) { sinkSpecBuilder.setSerDeClassName(functionConfig.getOutputSerdeClassName()); } + if (typeArgs != null) { + sinkSpecBuilder.setTypeClassName(typeArgs[1].getName()); + } functionDetailsBuilder.setSink(sinkSpecBuilder); if (functionConfig.getTenant() != null) { @@ -587,7 +600,7 @@ public class CmdFunctions extends CmdBase { functionDetailsBuilder.setRuntime(convertRuntime(functionConfig.getRuntime())); } if (!functionConfig.getUserConfig().isEmpty()) { - functionDetailsBuilder.putAllUserConfig(functionConfig.getUserConfig()); + functionDetailsBuilder.setUserConfig(new Gson().toJson(functionConfig.getUserConfig())); } if (functionConfig.getProcessingGuarantees() != null) { functionDetailsBuilder.setProcessingGuarantees( @@ -595,6 +608,7 @@ public class CmdFunctions extends CmdBase { } functionDetailsBuilder.setAutoAck(functionConfig.isAutoAck()); functionDetailsBuilder.setParallelism(functionConfig.getParallelism()); + return functionDetailsBuilder.build(); } diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java index 01d8291..7af6144 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java @@ -117,14 +117,14 @@ public interface Context { * Get a map of all user-defined key/value configs for the function * @return The full map of user-defined config values */ - Map<String, String> getUserConfigMap(); + Map<String, Object> getUserConfigMap(); /** * Get any user-defined key/value * @param key The key * @return The Optional value specified by the user for that key. */ - Optional<String> getUserConfigValue(String key); + Optional<Object> getUserConfigValue(String key); /** * Get any user-defined key/value or a default value if none is present @@ -132,7 +132,7 @@ public interface Context { * @param defaultValue * @return Either the user config value associated with a given key or a supplied default value */ - String getUserConfigValueOrDefault(String key, String defaultValue); + Object getUserConfigValueOrDefault(String key, String defaultValue); /** * Record a user defined metric diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 563e8e3..7b76df8 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -18,8 +18,11 @@ */ package org.apache.pulsar.functions.instance; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import lombok.Getter; import lombok.Setter; +import org.apache.commons.lang.StringUtils; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -93,6 +96,7 @@ class ContextImpl implements Context { @Getter @Setter private StateContextImpl stateContext; + private Map<String, Object> userConfigs; public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, ClassLoader classLoader, Consumer inputConsumer) { @@ -109,6 +113,8 @@ class ContextImpl implements Context { producerConfiguration.setBatchingEnabled(true); producerConfiguration.setBatchingMaxPublishDelay(1, TimeUnit.MILLISECONDS); producerConfiguration.setMaxPendingMessages(1000000); + userConfigs = new Gson().fromJson(config.getFunctionDetails().getUserConfig(), + new TypeToken<Map<String, Object>>(){}.getType()); } public void setCurrentMessageContext(MessageId messageId, String topicName) { @@ -181,18 +187,18 @@ class ContextImpl implements Context { } @Override - public Optional<String> getUserConfigValue(String key) { - return Optional.ofNullable(config.getFunctionDetails().getUserConfigOrDefault(key, null)); + public Optional<Object> getUserConfigValue(String key) { + return Optional.ofNullable(userConfigs.getOrDefault(key, null)); } @Override - public String getUserConfigValueOrDefault(String key, String defaultValue) { + public Object getUserConfigValueOrDefault(String key, String defaultValue) { return getUserConfigValue(key).orElse(defaultValue); } @Override - public Map<String, String> getUserConfigMap() { - return config.getFunctionDetails().getUserConfigMap(); + public Map<String, Object> getUserConfigMap() { + return userConfigs; } @Override @@ -221,6 +227,10 @@ class ContextImpl implements Context { } } + if (StringUtils.isEmpty(serDeClassName)) { + serDeClassName = DefaultSerDe.class.getName(); + } + if (!publishSerializers.containsKey(serDeClassName)) { SerDe serDe; if (serDeClassName.equals(DefaultSerDe.class.getName())) { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index ca6414d..8a4c7af 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -19,19 +19,7 @@ package org.apache.pulsar.functions.instance; -import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; -import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF; - import io.netty.buffer.ByteBuf; - -import java.util.Arrays; -import java.util.Base64; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; - import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -55,15 +43,23 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.connect.core.Record; import org.apache.pulsar.functions.api.Function; +import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.instance.processors.MessageProcessor; +import org.apache.pulsar.functions.instance.state.StateContextImpl; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.source.PulsarRecord; -import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; -import org.apache.pulsar.functions.api.SerDe; -import org.apache.pulsar.functions.instance.state.StateContextImpl; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.Reflections; +import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; + +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF; /** * A function container implemented using java thread. diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java index 33b699a..ad963f9 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java @@ -65,7 +65,8 @@ abstract class MessageProcessorBase implements MessageProcessor { org.apache.pulsar.functions.proto.Function.SourceSpec sourceSpec = this.functionDetails.getSource(); Object object; - if (sourceSpec.getClassName().equals(PulsarSource.class.getName())) { + // If source classname is not set, we default pulsar source + if (sourceSpec.getClassName().isEmpty()) { PulsarConfig pulsarConfig = new PulsarConfig(); pulsarConfig.setTopicSerdeClassNameMap(this.functionDetails.getSource().getTopicsToSerDeClassNameMap()); @@ -80,7 +81,7 @@ abstract class MessageProcessorBase implements MessageProcessor { Class[] paramTypes = {PulsarClient.class, PulsarConfig.class}; object = Reflections.createInstance( - sourceSpec.getClassName(), + PulsarSource.class.getName(), PulsarSource.class.getClassLoader(), params, paramTypes); } else { @@ -147,7 +148,9 @@ abstract class MessageProcessorBase implements MessageProcessor { public void close() { try { - this.source.close(); + if (this.source != null) { + this.source.close(); + } } catch (Exception e) { log.warn("Failed to close source {}", this.source, e); } diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py index d1a2496..cda7229 100644 --- a/pulsar-functions/instance/src/main/python/Function_pb2.py +++ b/pulsar-functions/instance/src/main/python/Function_pb2.py @@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( name='Function.proto', package='proto', syntax='proto3', - serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xcb\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12:\n\nuserConfig\x18\x07 \x03(\x0b\x32&.proto.FunctionDetails.UserConfigEntry\x12/\n\x07runtime\x18\x08 \x01(\x0e\x32\x1e.proto.Fu [...] + serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xf0\x02\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12\x12\n\nuserConfig\x18\x07 \x01(\t\x12/\n\x07runtime\x18\x08 \x01(\x0e\x32\x1e.proto.FunctionDetails.Runtime\x12\x0f\n\x07\x61uto [...] ) _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor( @@ -63,8 +63,8 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=1180, - serialized_end=1259, + serialized_start=1135, + serialized_end=1214, ) _sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES) @@ -86,8 +86,8 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=1261, - serialized_end=1305, + serialized_start=1216, + serialized_end=1260, ) _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE) @@ -116,49 +116,12 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=454, - serialized_end=485, + serialized_start=363, + serialized_end=394, ) _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME) -_FUNCTIONDETAILS_USERCONFIGENTRY = _descriptor.Descriptor( - name='UserConfigEntry', - full_name='proto.FunctionDetails.UserConfigEntry', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='key', full_name='proto.FunctionDetails.UserConfigEntry.key', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None, file=DESCRIPTOR), - _descriptor.FieldDescriptor( - name='value', full_name='proto.FunctionDetails.UserConfigEntry.value', index=1, - number=2, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None, file=DESCRIPTOR), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')), - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=403, - serialized_end=452, -) - _FUNCTIONDETAILS = _descriptor.Descriptor( name='FunctionDetails', full_name='proto.FunctionDetails', @@ -210,8 +173,8 @@ _FUNCTIONDETAILS = _descriptor.Descriptor( options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='userConfig', full_name='proto.FunctionDetails.userConfig', index=6, - number=7, type=11, cpp_type=10, label=3, - has_default_value=False, default_value=[], + number=7, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), @@ -253,7 +216,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor( ], extensions=[ ], - nested_types=[_FUNCTIONDETAILS_USERCONFIGENTRY, ], + nested_types=[], enum_types=[ _FUNCTIONDETAILS_RUNTIME, ], @@ -264,7 +227,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor( oneofs=[ ], serialized_start=26, - serialized_end=485, + serialized_end=394, ) @@ -301,8 +264,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=668, - serialized_end=729, + serialized_start=600, + serialized_end=661, ) _SOURCESPEC = _descriptor.Descriptor( @@ -327,14 +290,21 @@ _SOURCESPEC = _descriptor.Descriptor( is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='subscriptionType', full_name='proto.SourceSpec.subscriptionType', index=2, + name='typeClassName', full_name='proto.SourceSpec.typeClassName', index=2, + number=5, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='subscriptionType', full_name='proto.SourceSpec.subscriptionType', index=3, number=3, type=14, cpp_type=8, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='topicsToSerDeClassName', full_name='proto.SourceSpec.topicsToSerDeClassName', index=3, + name='topicsToSerDeClassName', full_name='proto.SourceSpec.topicsToSerDeClassName', index=4, number=4, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, @@ -352,8 +322,8 @@ _SOURCESPEC = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=488, - serialized_end=729, + serialized_start=397, + serialized_end=661, ) @@ -379,15 +349,22 @@ _SINKSPEC = _descriptor.Descriptor( is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='topic', full_name='proto.SinkSpec.topic', index=2, - number=4, type=9, cpp_type=9, label=1, + name='typeClassName', full_name='proto.SinkSpec.typeClassName', index=2, + number=5, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='serDeClassName', full_name='proto.SinkSpec.serDeClassName', index=3, - number=5, type=9, cpp_type=9, label=1, + name='topic', full_name='proto.SinkSpec.topic', index=3, + number=3, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='serDeClassName', full_name='proto.SinkSpec.serDeClassName', index=4, + number=4, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, @@ -404,8 +381,8 @@ _SINKSPEC = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=731, - serialized_end=816, + serialized_start=663, + serialized_end=771, ) @@ -435,8 +412,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=818, - serialized_end=864, + serialized_start=773, + serialized_end=819, ) @@ -487,8 +464,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=867, - serialized_end=1028, + serialized_start=822, + serialized_end=983, ) @@ -525,8 +502,8 @@ _INSTANCE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1030, - serialized_end=1111, + serialized_start=985, + serialized_end=1066, ) @@ -563,13 +540,11 @@ _ASSIGNMENT = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1113, - serialized_end=1178, + serialized_start=1068, + serialized_end=1133, ) -_FUNCTIONDETAILS_USERCONFIGENTRY.containing_type = _FUNCTIONDETAILS _FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _PROCESSINGGUARANTEES -_FUNCTIONDETAILS.fields_by_name['userConfig'].message_type = _FUNCTIONDETAILS_USERCONFIGENTRY _FUNCTIONDETAILS.fields_by_name['runtime'].enum_type = _FUNCTIONDETAILS_RUNTIME _FUNCTIONDETAILS.fields_by_name['source'].message_type = _SOURCESPEC _FUNCTIONDETAILS.fields_by_name['sink'].message_type = _SINKSPEC @@ -593,19 +568,11 @@ DESCRIPTOR.enum_types_by_name['SubscriptionType'] = _SUBSCRIPTIONTYPE _sym_db.RegisterFileDescriptor(DESCRIPTOR) FunctionDetails = _reflection.GeneratedProtocolMessageType('FunctionDetails', (_message.Message,), dict( - - UserConfigEntry = _reflection.GeneratedProtocolMessageType('UserConfigEntry', (_message.Message,), dict( - DESCRIPTOR = _FUNCTIONDETAILS_USERCONFIGENTRY, - __module__ = 'Function_pb2' - # @@protoc_insertion_point(class_scope:proto.FunctionDetails.UserConfigEntry) - )) - , DESCRIPTOR = _FUNCTIONDETAILS, __module__ = 'Function_pb2' # @@protoc_insertion_point(class_scope:proto.FunctionDetails) )) _sym_db.RegisterMessage(FunctionDetails) -_sym_db.RegisterMessage(FunctionDetails.UserConfigEntry) SourceSpec = _reflection.GeneratedProtocolMessageType('SourceSpec', (_message.Message,), dict( @@ -660,8 +627,6 @@ _sym_db.RegisterMessage(Assignment) DESCRIPTOR.has_options = True DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n!org.apache.pulsar.functions.protoB\010Function')) -_FUNCTIONDETAILS_USERCONFIGENTRY.has_options = True -_FUNCTIONDETAILS_USERCONFIGENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')) _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.has_options = True _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')) # @@protoc_insertion_point(module_scope) diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py index a841152..3ef1ebe 100644 --- a/pulsar-functions/instance/src/main/python/contextimpl.py +++ b/pulsar-functions/instance/src/main/python/contextimpl.py @@ -25,6 +25,7 @@ import time import os +import json import pulsar import util @@ -59,6 +60,9 @@ class ContextImpl(pulsar.Context): self.current_message_id = None self.current_input_topic_name = None self.current_start_time = None + self.user_config = json.loads(instance_config.function_details.userConfig) \ + if instance_config.function_details.userConfig \ + else []; # Called on a per message basis to set the context for the current message def set_current_message_context(self, msgid, topic): @@ -94,13 +98,13 @@ class ContextImpl(pulsar.Context): return self.log def get_user_config_value(self, key): - if key in self.instance_config.function_details.userConfig: - return str(self.instance_config.function_details.userConfig[key]) + if key in self.user_config: + return self.user_config[key] else: return None def get_user_config_map(self): - return self.instance_config.function_details.userConfig + return self.user_config def record_metric(self, metric_name, metric_value): if not metric_name in self.accumulated_metrics: diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index f2763a9..7454132 100644 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -116,9 +116,7 @@ def main(): else: function_details.autoAck = False if args.user_config != None and len(args.user_config) != 0: - user_config = json.loads(args.user_config) - for (key, value) in user_config.items(): - function_details.userConfig[str(key)] = str(value) + function_details.userConfig = args.user_config pulsar_client = pulsar.Client(args.pulsar_serviceurl) pyinstance = python_instance.PythonInstance(str(args.instance_id), str(args.function_id), diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java index 01e3852..e97b692 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java @@ -25,7 +25,7 @@ import org.apache.pulsar.functions.api.utils.DefaultSerDe; public class PublishFunction implements Function<String, Void> { @Override public Void process(String input, Context context) { - String publishTopic = context.getUserConfigValueOrDefault("publish-topic", "persistent://sample/standalone/ns1/publish"); + String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "persistent://sample/standalone/ns1/publish"); String output = String.format("%s!", input); context.publish(publishTopic, output, DefaultSerDe.class.getName()); return null; diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java index 6ff3dd8..fb3ceb0 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java @@ -28,11 +28,11 @@ public class UserConfigFunction implements Function<String, Void> { @Override public Void process(String input, Context context) { String key = "config-key"; - Optional<String> maybeValue = context.getUserConfigValue(key); + Optional<Object> maybeValue = context.getUserConfigValue(key); Logger LOG = context.getLogger(); if (maybeValue.isPresent()) { - String value = maybeValue.get(); + String value = (String) maybeValue.get(); LOG.info("The config value is {}", value); } else { LOG.error("No value present for the key {}", key); diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index f4b1fb8..3c1414e 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -45,7 +45,7 @@ message FunctionDetails { string className = 4; string logTopic = 5; ProcessingGuarantees processingGuarantees = 6; - map<string,string> userConfig = 7; + string userConfig = 7; Runtime runtime = 8; bool autoAck = 9; int32 parallelism = 10; @@ -57,6 +57,7 @@ message SourceSpec { string className = 1; // map in json format string configs = 2; + string typeClassName = 5; // configs used only when source feeds into functions SubscriptionType subscriptionType = 3; @@ -67,6 +68,7 @@ message SinkSpec { string className = 1; // map in json format string configs = 2; + string typeClassName = 5; // configs used only when functions output to sink string topic = 3; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index e0330bb..f19dc26 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -100,7 +100,7 @@ public class JavaInstanceMain { @Parameter(names = "--auto_ack", description = "Enable Auto Acking?\n") protected String autoAck = "true"; - @Parameter(names = "--source_classname", description = "The source classname", required = true) + @Parameter(names = "--source_classname", description = "The source classname") protected String sourceClassname; @Parameter(names = "--source_configs", description = "The source configs") @@ -149,13 +149,13 @@ public class JavaInstanceMain { functionDetailsBuilder.setAutoAck(false); } if (userConfig != null && !userConfig.isEmpty()) { - Type type = new TypeToken<Map<String, String>>(){}.getType(); - Map<String, String> userConfigMap = new Gson().fromJson(userConfig, type); - functionDetailsBuilder.putAllUserConfig(userConfigMap); + functionDetailsBuilder.setUserConfig(userConfig); } SourceSpec.Builder sourceDetailsBuilder = SourceSpec.newBuilder(); - sourceDetailsBuilder.setClassName(sourceClassname); + if (sourceClassname != null) { + sourceDetailsBuilder.setClassName(sourceClassname); + } if (sourceConfigs != null && !sourceConfigs.isEmpty()) {; sourceDetailsBuilder.setConfigs(sourceConfigs); } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index 3aae26f..10d04d1 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -134,10 +134,10 @@ class ProcessRuntime implements Runtime { args.add(pulsarServiceUrl); args.add("--max_buffered_tuples"); args.add(String.valueOf(instanceConfig.getMaxBufferedTuples())); - Map<String, String> userConfig = instanceConfig.getFunctionDetails().getUserConfigMap(); + String userConfig = instanceConfig.getFunctionDetails().getUserConfig(); if (userConfig != null && !userConfig.isEmpty()) { args.add("--user_config"); - args.add(new Gson().toJson(userConfig)); + args.add(userConfig); } instancePort = findAvailablePort(); args.add("--port"); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java index cb7d0a2..04bb771 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java @@ -74,7 +74,7 @@ public class FunctionConfig { private String logTopic; private ProcessingGuarantees processingGuarantees; - private Map<String, String> userConfig = new HashMap<>(); + private Map<String, Object> userConfig = new HashMap<>(); private SubscriptionType subscriptionType; private Runtime runtime; private boolean autoAck; -- To stop receiving notification emails like this one, please contact mme...@apache.org.