merlimat closed pull request #2114: Submit and run locally builtin connectors URL: https://github.com/apache/incubator-pulsar/pull/2114
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bin/pulsar b/bin/pulsar index d8f6e213cb..c28bfe0210 100755 --- a/bin/pulsar +++ b/bin/pulsar @@ -19,7 +19,7 @@ # BINDIR=$(dirname "$0") -PULSAR_HOME=`cd $BINDIR/..;pwd` +export PULSAR_HOME=`cd $BINDIR/..;pwd` DEFAULT_BROKER_CONF=$PULSAR_HOME/conf/broker.conf DEFAULT_BOOKKEEPER_CONF=$PULSAR_HOME/conf/bookkeeper.conf diff --git a/bin/pulsar-admin b/bin/pulsar-admin index c984617864..1a1339d8ab 100755 --- a/bin/pulsar-admin +++ b/bin/pulsar-admin @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,7 +19,7 @@ # BINDIR=$(dirname "$0") -PULSAR_HOME=`cd $BINDIR/..;pwd` +export PULSAR_HOME=`cd $BINDIR/..;pwd` DEFAULT_CLIENT_CONF=$PULSAR_HOME/conf/client.conf DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml @@ -55,7 +55,7 @@ fi # exclude tests jar BUILT_JAR=`ls $PULSAR_HOME/pulsar-client-tools/target/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1` -if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then +if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then echo "\nCouldn't find pulsar jar."; echo "Make sure you've run 'mvn package'\n"; exit 1; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java index d34f419f92..9f7339c690 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.admin; import java.util.List; +import java.util.Set; import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; @@ -232,4 +233,22 @@ * */ List<ConnectorDefinition> getConnectorsList() throws PulsarAdminException; + + /** + * Fetches a list of supported Pulsar IO sources currently running in cluster mode + * + * @throws PulsarAdminException + * Unexpected error + * + */ + Set<String> getSources() throws PulsarAdminException; + + /** + * Fetches a list of supported Pulsar IO sinks currently running in cluster mode + * + * @throws PulsarAdminException + * Unexpected error + * + */ + Set<String> getSinks() throws PulsarAdminException; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 5cb9284b94..a2008cf9f3 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -27,6 +27,8 @@ import java.io.InputStream; import java.nio.file.StandardCopyOption; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import javax.ws.rs.ClientErrorException; import javax.ws.rs.client.Entity; @@ -37,6 +39,7 @@ import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.Functions; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; @@ -110,7 +113,10 @@ public void createFunction(FunctionDetails functionDetails, String fileName) thr try { final FormDataMultiPart mp = new FormDataMultiPart(); - mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE)); + if (fileName != null && !fileName.startsWith("builtin://")) { + // If the function code is built in, we don't need to submit here + mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE)); + } mp.bodyPart(new FormDataBodyPart("functionDetails", printJson(functionDetails), @@ -153,9 +159,12 @@ public void deleteFunction(String cluster, String namespace, String function) th public void updateFunction(FunctionDetails functionDetails, String fileName) throws PulsarAdminException { try { final FormDataMultiPart mp = new FormDataMultiPart(); - if (fileName != null) { + + if (fileName != null && !fileName.startsWith("builtin://")) { + // If the function code is built in, we don't need to submit here mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE)); } + mp.bodyPart(new FormDataBodyPart("functionDetails", printJson(functionDetails), MediaType.APPLICATION_JSON_TYPE)); @@ -251,6 +260,18 @@ public void downloadFunction(String destinationPath, String path) throws PulsarA } } + @Override + public Set<String> getSources() throws PulsarAdminException { + return getConnectorsList().stream().filter(c -> !StringUtils.isEmpty(c.getSourceClass())) + .map(ConnectorDefinition::getName).collect(Collectors.toSet()); + } + + @Override + public Set<String> getSinks() throws PulsarAdminException { + return getConnectorsList().stream().filter(c -> !StringUtils.isEmpty(c.getSinkClass())) + .map(ConnectorDefinition::getName).collect(Collectors.toSet()); + } + public static void mergeJson(String json, Builder builder) throws IOException { JsonFormat.parser().merge(json, builder); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 2c91588f05..c27b40aa8d 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -24,12 +24,26 @@ import com.beust.jcommander.converters.StringConverter; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.lang.reflect.Type; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.text.WordUtils; import org.apache.pulsar.admin.cli.utils.CmdUtils; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.nar.NarClassLoader; @@ -44,6 +58,7 @@ import org.apache.pulsar.functions.utils.SinkConfig; import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.utils.io.ConnectorUtils; +import org.apache.pulsar.functions.utils.io.Connectors; import org.apache.pulsar.functions.utils.validation.ConfigValidation; import java.io.File; @@ -139,6 +154,25 @@ void runCmd() throws Exception { .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), sinkConfig.getArchive(), admin); } + + @Override + protected String validateSinkType(String sinkType) throws IOException { + // Validate the connector sink type from the locally available connectors + String pulsarHome = System.getenv("PULSAR_HOME"); + if (pulsarHome == null) { + pulsarHome = Paths.get("").toAbsolutePath().toString(); + } + String connectorsDir = Paths.get(pulsarHome, "connectors").toString(); + Connectors connectors = ConnectorUtils.searchForConnectors(connectorsDir); + + if (!connectors.getSinks().containsKey(sinkType)) { + throw new ParameterException("Invalid sink type '" + sinkType + "' -- Available sinks are: " + + connectors.getSinks().keySet()); + } + + // Sink type is a valid built-in connector type. For local-run we'll fill it up with its own archive path + return connectors.getSinks().get(sinkType).toString(); + } } @Parameters(commandDescription = "Submit a Pulsar IO sink connector to run in a Pulsar cluster") @@ -174,6 +208,10 @@ void runCmd() throws Exception { protected String namespace; @Parameter(names = "--name", description = "The sink's name") protected String name; + + @Parameter(names = { "-t", "--sink-type" }, description = "The sinks's connector provider") + protected String sinkType; + @Parameter(names = "--inputs", description = "The sink's input topic or topics (multiple topics can be specified as a comma-separated list)") protected String inputs; @Parameter(names = "--topicsPattern", description = "TopicsPattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topicsPattern] are mutually exclusive. Add SerDe class name for a pattern in --customSerdeInputs (supported for java fun only)") @@ -246,10 +284,18 @@ void processArguments() throws Exception { sinkConfig.setParallelism(parallelism); } + if (archive != null && sinkType != null) { + throw new ParameterException("Cannot specify both archive and sink-type"); + } + if (null != archive) { sinkConfig.setArchive(archive); } + if (sinkType != null) { + sinkConfig.setArchive(validateSinkType(sinkType)); + } + org.apache.pulsar.functions.utils.Resources resources = sinkConfig.getResources(); if (resources == null) { resources = new org.apache.pulsar.functions.utils.Resources(); @@ -307,6 +353,7 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) { throw new ParameterException("Sink archive not specfied"); } + boolean isConnectorBuiltin = sinkConfig.getArchive().startsWith(Utils.BUILTIN); boolean isArchivePathUrl = Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive()); String archivePath = null; @@ -326,6 +373,9 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) { + ", due to =" + e.getMessage()); } } + } else if (isConnectorBuiltin) { + // Ignore local checks when submitting built-in connector + archivePath = null; } else { archivePath = sinkConfig.getArchive(); } @@ -339,6 +389,7 @@ protected void validateSinkConfigs(SinkConfig sinkConfig) { try { ConnectorDefinition connector = ConnectorUtils.getConnectorDefinition(archivePath); log.info("Connector: {}", connector); + // Validate sink class ConnectorUtils.getIOSinkClass(archivePath); } catch (IOException e) { @@ -368,16 +419,23 @@ protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) throws IOExcep // check if configs are valid validateSinkConfigs(sinkConfig); - String sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive()); + String sinkClassName = null; + String typeArg = null; + + FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + + boolean isBuiltin = sinkConfig.getArchive().startsWith(Utils.BUILTIN); - String typeArg; - try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sinkConfig.getArchive()), - Collections.emptySet())) { - typeArg = sinkConfig.getArchive().startsWith(Utils.FILE) ? null - : Utils.getSinkType(sinkClassName, ncl).getName(); + if (!isBuiltin) { + sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive()); + + try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sinkConfig.getArchive()), + Collections.emptySet())) { + typeArg = sinkConfig.getArchive().startsWith(Utils.FILE) ? null + : Utils.getSinkType(sinkClassName, ncl).getName(); + } } - FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); if (sinkConfig.getTenant() != null) { functionDetailsBuilder.setTenant(sinkConfig.getTenant()); } @@ -414,7 +472,15 @@ protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) throws IOExcep // set up sink spec SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder(); - sinkSpecBuilder.setClassName(sinkClassName); + if (sinkClassName != null) { + sinkSpecBuilder.setClassName(sinkClassName); + } + + if (isBuiltin) { + String builtin = sinkConfig.getArchive().replaceFirst("^builtin://", ""); + sinkSpecBuilder.setBuiltin(builtin); + } + if (sinkConfig.getConfigs() != null) { sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfig.getConfigs())); } @@ -438,6 +504,23 @@ protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) throws IOExcep } return functionDetailsBuilder.build(); } + + protected String validateSinkType(String sinkType) throws IOException { + Set<String> availableSinks; + try { + availableSinks = admin.functions().getSinks(); + } catch (PulsarAdminException e) { + throw new IOException(e); + } + + if (!availableSinks.contains(sinkType)) { + throw new ParameterException( + "Invalid sink type '" + sinkType + "' -- Available sinks are: " + availableSinks); + } + + // Source type is a valid built-in connector type + return "builtin://" + sinkType; + } } @Parameters(commandDescription = "Stops a Pulsar IO sink connector") @@ -475,7 +558,7 @@ void runCmd() throws Exception { } @Parameters(commandDescription = "Get the list of Pulsar IO connector sinks supported by Pulsar cluster") - public class ListSinks extends SinkCommand { + public class ListSinks extends BaseCommand { @Override void runCmd() throws Exception { admin.functions().getConnectorsList().stream().filter(x -> !StringUtils.isEmpty(x.getSinkClass())) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index 53609d0c0b..7c147e5f1d 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -24,12 +24,23 @@ import com.beust.jcommander.converters.StringConverter; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.lang.reflect.Type; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.text.WordUtils; import org.apache.pulsar.admin.cli.utils.CmdUtils; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.nar.NarClassLoader; @@ -43,6 +54,7 @@ import org.apache.pulsar.functions.utils.SourceConfig; import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.utils.io.ConnectorUtils; +import org.apache.pulsar.functions.utils.io.Connectors; import org.apache.pulsar.functions.utils.validation.ConfigValidation; import java.io.File; @@ -135,6 +147,25 @@ void runCmd() throws Exception { .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), sourceConfig.getArchive(), admin); } + + @Override + protected String validateSourceType(String sourceType) throws IOException { + // Validate the connector source type from the locally available connectors + String pulsarHome = System.getenv("PULSAR_HOME"); + if (pulsarHome == null) { + pulsarHome = Paths.get("").toAbsolutePath().toString(); + } + String connectorsDir = Paths.get(pulsarHome, "connectors").toString(); + Connectors connectors = ConnectorUtils.searchForConnectors(connectorsDir); + + if (!connectors.getSources().containsKey(sourceType)) { + throw new ParameterException("Invalid source type '" + sourceType + "' -- Available sources are: " + + connectors.getSources().keySet()); + } + + // Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path + return connectors.getSources().get(sourceType).toString(); + } } @Parameters(commandDescription = "Submit a Pulsar IO source connector to run in a Pulsar cluster") @@ -170,10 +201,12 @@ void runCmd() throws Exception { protected String namespace; @Parameter(names = "--name", description = "The source's name") protected String name; + + @Parameter(names = { "-t", "--source-type" }, description = "The source's connector provider") + protected String sourceType; + @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the Source") protected FunctionConfig.ProcessingGuarantees processingGuarantees; - @Parameter(names = "--className", description = "The source's class name") - protected String className; @Parameter(names = "--destinationTopicName", description = "The Pulsar topic to which data is sent") protected String destinationTopicName; @Parameter(names = "--deserializationClassName", description = "The SerDe classname for the source") @@ -228,10 +261,18 @@ void processArguments() throws Exception { sourceConfig.setParallelism(parallelism); } + if (archive != null && sourceType != null) { + throw new ParameterException("Cannot specify both archive and source-type"); + } + if (archive != null) { sourceConfig.setArchive(archive); } + if (sourceType != null) { + sourceConfig.setArchive(validateSourceType(sourceType)); + } + org.apache.pulsar.functions.utils.Resources resources = sourceConfig.getResources(); if (resources == null) { resources = new org.apache.pulsar.functions.utils.Resources(); @@ -276,6 +317,7 @@ protected void validateSourceConfigs(SourceConfig sourceConfig) { throw new ParameterException("Source archive not specfied"); } + boolean isConnectorBuiltin = sourceConfig.getArchive().startsWith(Utils.BUILTIN); boolean isArchivePathUrl = Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive()); String archivePath = null; @@ -295,6 +337,9 @@ protected void validateSourceConfigs(SourceConfig sourceConfig) { + ", due to =" + e.getMessage()); } } + } else if (isConnectorBuiltin) { + // Ignore local checks when submitting built-in connector + archivePath = null; } else { archivePath = sourceConfig.getArchive(); } @@ -338,16 +383,23 @@ protected FunctionDetails createSourceConfig(SourceConfig sourceConfig) throws I // check if source configs are valid validateSourceConfigs(sourceConfig); - String sourceClassName = ConnectorUtils.getIOSourceClass(sourceConfig.getArchive()); + String sourceClassName = null; + String typeArg = null; + + FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + + boolean isBuiltin = sourceConfig.getArchive().startsWith(Utils.BUILTIN); + + if (!isBuiltin) { + sourceClassName = ConnectorUtils.getIOSourceClass(sourceConfig.getArchive()); - String typeArg; - try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sourceConfig.getArchive()), - Collections.emptySet())) { - typeArg = sourceConfig.getArchive().startsWith(Utils.FILE) ? null - : getSourceType(sourceClassName, ncl).getName(); + try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sourceConfig.getArchive()), + Collections.emptySet())) { + typeArg = sourceConfig.getArchive().startsWith(Utils.FILE) ? null + : getSourceType(sourceClassName, ncl).getName(); + } } - FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); if (sourceConfig.getTenant() != null) { functionDetailsBuilder.setTenant(sourceConfig.getTenant()); } @@ -368,7 +420,14 @@ protected FunctionDetails createSourceConfig(SourceConfig sourceConfig) throws I // set source spec SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); - sourceSpecBuilder.setClassName(sourceClassName); + if (sourceClassName != null) { + sourceSpecBuilder.setClassName(sourceClassName); + } + + if (isBuiltin) { + String builtin = sourceConfig.getArchive().replaceFirst("^builtin://", ""); + sourceSpecBuilder.setBuiltin(builtin); + } if (sourceConfig.getConfigs() != null) { sourceSpecBuilder.setConfigs(new Gson().toJson(sourceConfig.getConfigs())); @@ -409,6 +468,23 @@ protected FunctionDetails createSourceConfig(SourceConfig sourceConfig) throws I return functionDetailsBuilder.build(); } + + protected String validateSourceType(String sourceType) throws IOException { + Set<String> availableSources; + try { + availableSources = admin.functions().getSources(); + } catch (PulsarAdminException e) { + throw new IOException(e); + } + + if (!availableSources.contains(sourceType)) { + throw new ParameterException( + "Invalid source type '" + sourceType + "' -- Available sources are: " + availableSources); + } + + // Source type is a valid built-in connector type + return "builtin://" + sourceType; + } } @Parameters(commandDescription = "Stops a Pulsar IO source connector") @@ -446,7 +522,7 @@ void runCmd() throws Exception { } @Parameters(commandDescription = "Get the list of Pulsar IO connector sources supported by Pulsar cluster") - public class ListSources extends SourceCommand { + public class ListSources extends BaseCommand { @Override void runCmd() throws Exception { admin.functions().getConnectorsList().stream().filter(x -> !StringUtils.isEmpty(x.getSourceClass())) @@ -457,5 +533,4 @@ void runCmd() throws Exception { }); } } - } diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index 0f8b649d65..01b3660b6e 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -71,6 +71,10 @@ message SourceSpec { map<string,string> topicsToSerDeClassName = 4; uint64 timeoutMs = 6; string topicsPattern = 7; + + /* If specified, this will refer to an archive that is + * already present in the server */ + string builtin = 8; } message SinkSpec { @@ -82,6 +86,10 @@ message SinkSpec { // configs used only when functions output to sink string topic = 3; string serDeClassName = 4; + + /* If specified, this will refer to an archive that is + * already present in the server */ + string builtin = 6; } message PackageLocationMetaData { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java index 8b5e9fbeff..0c25be29ca 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java @@ -55,6 +55,7 @@ public static String HTTP = "http"; public static String FILE = "file"; + public static String BUILTIN = "builtin"; public static final long getSequenceId(MessageId messageId) { MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl) @@ -222,7 +223,7 @@ public static boolean fileExists(String file) { } public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) { - return isNotBlank(functionPkgUrl) - && (functionPkgUrl.startsWith(Utils.HTTP) || functionPkgUrl.startsWith(Utils.FILE)); + return isNotBlank(functionPkgUrl) && (functionPkgUrl.startsWith(Utils.HTTP) + || functionPkgUrl.startsWith(Utils.FILE)); } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java index 7ff6b974de..297a2f27d0 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java @@ -643,6 +643,11 @@ public void validateField(String name, Object o) { @Override public void validateField(String name, Object o) { SourceConfig sourceConfig = (SourceConfig) o; + if (sourceConfig.getArchive().startsWith(Utils.BUILTIN)) { + // We don't have to check the archive, since it's provided on the worker itself + return; + } + String sourceClassName; try { sourceClassName = ConnectorUtils.getIOSourceClass(sourceConfig.getArchive()); @@ -714,6 +719,11 @@ public void validateField(String name, Object o) { @Override public void validateField(String name, Object o) { SinkConfig sinkConfig = (SinkConfig) o; + if (sinkConfig.getArchive().startsWith(Utils.BUILTIN)) { + // We don't have to check the archive, since it's provided on the worker itself + return; + } + // if function-pkg url is present eg: file://xyz.jar then admin-tool might not have access of the file at // the same location so, need to rely on server side validation. if (Utils.isFunctionPackageUrlSupported(sinkConfig.getArchive())) { @@ -805,7 +815,7 @@ public void validateField(String name, Object o) { if(!Utils.isFunctionPackageUrlSupported(path)) { // check file existence if path is not url and local path - if (!fileExists(path)) { + if (!path.startsWith(Utils.BUILTIN) && !fileExists(path)) { throw new IllegalArgumentException (String.format("File %s specified in field '%s' does not exist", path, name)); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index ef519f4dce..4741c404d6 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -20,8 +20,13 @@ import static org.apache.pulsar.functions.utils.Utils.FILE; import static org.apache.pulsar.functions.utils.Utils.HTTP; +import static org.apache.pulsar.functions.utils.Utils.getSourceType; import static org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.MoreFiles; +import com.google.common.io.RecursiveDeleteOption; + import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; @@ -30,29 +35,32 @@ import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.Collections; import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + import org.apache.commons.lang3.StringUtils; import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; +import org.apache.pulsar.functions.proto.Function.SinkSpec; +import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.RuntimeSpawner; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.io.MoreFiles; -import com.google.common.io.RecursiveDeleteOption; - -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.utils.io.ConnectorUtils; @Data @Setter @@ -68,15 +76,18 @@ private LinkedBlockingQueue<FunctionAction> actionQueue; private volatile boolean running; private Thread actioner; + private final ConnectorsManager connectorsManager; public FunctionActioner(WorkerConfig workerConfig, RuntimeFactory runtimeFactory, Namespace dlogNamespace, - LinkedBlockingQueue<FunctionAction> actionQueue) { + LinkedBlockingQueue<FunctionAction> actionQueue, + ConnectorsManager connectorsManager) { this.workerConfig = workerConfig; this.runtimeFactory = runtimeFactory; this.dlogNamespace = dlogNamespace; this.actionQueue = actionQueue; + this.connectorsManager = connectorsManager; actioner = new Thread(() -> { log.info("Starting Actioner Thread..."); while(running) { @@ -118,30 +129,33 @@ public void join() throws InterruptedException { protected void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception { FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData(); int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); - log.info("Starting function {} - {} ...", - functionMetaData.getFunctionDetails().getName(), instanceId); + + FunctionDetails.Builder functionDetails = FunctionDetails.newBuilder(functionMetaData.getFunctionDetails()); + log.info("Starting function {} - {} ...", functionDetails.getName(), instanceId); File pkgFile = null; - + String pkgLocation = functionMetaData.getPackageLocation().getPackagePath(); boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation); - - if(isPkgUrlProvided && pkgLocation.startsWith(FILE)) { + + if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) { URL url = new URL(pkgLocation); pkgFile = new File(url.toURI()); + } else if (isFunctionCodeBuiltin(functionDetails)) { + pkgFile = getBuiltinArchive(functionDetails); } else { File pkgDir = new File( workerConfig.getDownloadDirectory(), getDownloadPackagePath(functionMetaData, instanceId)); pkgDir.mkdirs(); - + pkgFile = new File( pkgDir, new File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails())).getName()); downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId); } - + InstanceConfig instanceConfig = new InstanceConfig(); - instanceConfig.setFunctionDetails(functionMetaData.getFunctionDetails()); + instanceConfig.setFunctionDetails(functionDetails.build()); // TODO: set correct function id and version when features implemented instanceConfig.setFunctionId(UUID.randomUUID().toString()); instanceConfig.setFunctionVersion(UUID.randomUUID().toString()); @@ -156,9 +170,9 @@ protected void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exc } private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaData functionMetaData, int instanceId) throws FileNotFoundException, IOException { - + File pkgDir = pkgFile.getParentFile(); - + if (pkgFile.exists()) { log.warn("Function package exists already {} deleting it", pkgFile); @@ -178,7 +192,7 @@ private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaDa boolean downloadFromHttp = isPkgUrlProvided && pkgLocationPath.startsWith(HTTP); log.info("Function package file {} will be downloaded from {}", tempPkgFile, downloadFromHttp ? pkgLocationPath : functionMetaData.getPackageLocation()); - + if(downloadFromHttp) { Utils.downloadFromHttpUrl(pkgLocationPath, new FileOutputStream(tempPkgFile)); } else { @@ -187,7 +201,7 @@ private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaDa new FileOutputStream(tempPkgFile), pkgLocationPath); } - + try { // create a hardlink, if there are two concurrent createLink operations, one will fail. // this ensures one instance will successfully download the package. @@ -243,4 +257,70 @@ private String getDownloadPackagePath(FunctionMetaData functionMetaData, int ins }, File.separatorChar); } + + public static boolean isFunctionCodeBuiltin(FunctionDetailsOrBuilder functionDetails) { + if (functionDetails.hasSource()) { + SourceSpec sourceSpec = functionDetails.getSource(); + if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) { + return true; + } + } + + if (functionDetails.hasSink()) { + SinkSpec sinkSpec = functionDetails.getSink(); + if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) { + return true; + } + } + + return false; + } + + private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws IOException { + if (functionDetails.hasSource()) { + SourceSpec sourceSpec = functionDetails.getSource(); + if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) { + File archive = connectorsManager.getSourceArchive(sourceSpec.getBuiltin()).toFile(); + String sourceClass = ConnectorUtils.getConnectorDefinition(archive.toString()).getSourceClass(); + SourceSpec.Builder builder = SourceSpec.newBuilder(functionDetails.getSource()); + builder.setClassName(sourceClass); + functionDetails.setSource(builder); + + fillSourceSinkTypeClass(functionDetails, archive, sourceClass); + return archive; + } + } + + if (functionDetails.hasSink()) { + SinkSpec sinkSpec = functionDetails.getSink(); + if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) { + File archive = connectorsManager.getSinkArchive(sinkSpec.getBuiltin()).toFile(); + String sinkClass = ConnectorUtils.getConnectorDefinition(archive.toString()).getSinkClass(); + SinkSpec.Builder builder = SinkSpec.newBuilder(functionDetails.getSink()); + builder.setClassName(sinkClass); + functionDetails.setSink(builder); + + fillSourceSinkTypeClass(functionDetails, archive, sinkClass); + return archive; + } + } + + throw new IOException("Could not find built in archive definition"); + } + + private void fillSourceSinkTypeClass(FunctionDetails.Builder functionDetails, File archive, String className) + throws IOException { + try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet())) { + String typeArg = getSourceType(className, ncl).getName(); + + SourceSpec.Builder sourceBuilder = SourceSpec.newBuilder(functionDetails.getSource()); + sourceBuilder.setTypeClassName(typeArg); + functionDetails.setSource(sourceBuilder); + + SinkSpec.Builder sinkBuilder = SinkSpec.newBuilder(functionDetails.getSink()); + sinkBuilder.setTypeClassName(typeArg); + functionDetails.setSink(sinkBuilder); + } + } + } \ No newline at end of file diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 5c6184f96e..08de636d2a 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -78,13 +78,16 @@ private RuntimeFactory runtimeFactory; private MembershipManager membershipManager; + private final ConnectorsManager connectorsManager; public FunctionRuntimeManager(WorkerConfig workerConfig, PulsarClient pulsarClient, Namespace dlogNamespace, - MembershipManager membershipManager) throws Exception { + MembershipManager membershipManager, + ConnectorsManager connectorsManager) throws Exception { this.workerConfig = workerConfig; + this.connectorsManager = connectorsManager; Reader<byte[]> reader = pulsarClient.newReader() .topic(this.workerConfig.getFunctionAssignmentTopic()) @@ -99,7 +102,7 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, .tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath()) .useTls(workerConfig.isUseTls()).tlsAllowInsecureConnection(workerConfig.isTlsAllowInsecureConnection()) .tlsHostnameVerificationEnable(workerConfig.isTlsHostnameVerificationEnable()).build(); - + if (workerConfig.getThreadContainerFactory() != null) { this.runtimeFactory = new ThreadRuntimeFactory( workerConfig.getThreadContainerFactory().getThreadGroupName(), @@ -121,7 +124,7 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, this.actionQueue = new LinkedBlockingQueue<>(); this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory, - dlogNamespace, actionQueue); + dlogNamespace, actionQueue, connectorsManager); this.membershipManager = membershipManager; } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 0237700f29..439333190f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -102,12 +102,14 @@ public void start(URI dlogUri) throws InterruptedException { this.functionMetaDataManager = new FunctionMetaDataManager( this.workerConfig, this.schedulerManager, this.client); + this.connectorsManager = new ConnectorsManager(workerConfig); + //create membership manager this.membershipManager = new MembershipManager(this.workerConfig, this.client); // create function runtime manager this.functionRuntimeManager = new FunctionRuntimeManager( - this.workerConfig, this.client, this.dlogNamespace, this.membershipManager); + this.workerConfig, this.client, this.dlogNamespace, this.membershipManager, connectorsManager); // Setting references to managers in scheduler this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager); @@ -136,8 +138,6 @@ public void start(URI dlogUri) throws InterruptedException { // indicate function worker service is done intializing this.isInitialized = true; - this.connectorsManager = new ConnectorsManager(workerConfig); - } catch (Throwable t) { log.error("Error Starting up in worker", t); throw new RuntimeException(t); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index a9de4d7ba9..4ff66c6dbe 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -25,6 +25,8 @@ import static org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported; import static org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders.create; +import com.google.gson.Gson; + import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -51,6 +53,8 @@ import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; +import lombok.extern.slf4j.Slf4j; + import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Message; @@ -64,6 +68,8 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData; +import org.apache.pulsar.functions.proto.Function.SinkSpec; +import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders; @@ -77,9 +83,6 @@ import org.apache.pulsar.io.core.Source; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; -import com.google.gson.Gson; - -import lombok.extern.slf4j.Slf4j; import net.jodah.typetools.TypeResolver; @Slf4j @@ -148,12 +151,17 @@ public Response registerFunction(final String tenant, final String namespace, fi FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder() .setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0); - PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder() - .setPackagePath(isPkgUrlProvided ? functionPkgUrl - : createPackagePath(tenant, namespace, functionName, fileDetail.getFileName())); - functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); + PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder(); + boolean isBuiltin = isFunctionCodeBuiltin(functionDetails); + if (isBuiltin) { + packageLocationMetaDataBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails)); + } else { + packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? functionPkgUrl + : createPackagePath(tenant, namespace, functionName, fileDetail.getFileName())); + } - return isPkgUrlProvided ? updateRequest(functionMetaDataBuilder.build()) + functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); + return (isPkgUrlProvided || isBuiltin) ? updateRequest(functionMetaDataBuilder.build()) : updateRequest(functionMetaDataBuilder.build(), uploadedInputStream); } @@ -193,12 +201,18 @@ public Response updateFunction(final String tenant, final String namespace, fina FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder() .setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0); - PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder() - .setPackagePath(isPkgUrlProvided ? functionPkgUrl - : createPackagePath(tenant, namespace, functionName, fileDetail.getFileName())); - functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); + PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder(); + + boolean isBuiltin = isFunctionCodeBuiltin(functionDetails); + if (isBuiltin) { + packageLocationMetaDataBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails)); + } else { + packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? functionPkgUrl + : createPackagePath(tenant, namespace, functionName, fileDetail.getFileName())); + } - return isPkgUrlProvided ? updateRequest(functionMetaDataBuilder.build()) + functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); + return (isPkgUrlProvided || isBuiltin) ? updateRequest(functionMetaDataBuilder.build()) : updateRequest(functionMetaDataBuilder.build(), uploadedInputStream); } @@ -659,12 +673,53 @@ private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, Str private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName, InputStream uploadedInputStream, FormDataContentDisposition fileDetail, String functionDetailsJson) throws IllegalArgumentException { - if (uploadedInputStream == null || fileDetail == null) { + + FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, + functionDetailsJson, null); + if (!isFunctionCodeBuiltin(functionDetails) && (uploadedInputStream == null || fileDetail == null)) { throw new IllegalArgumentException("Function Package is not provided"); } - return validateUpdateRequestParams(tenant, namespace, functionName, functionDetailsJson, null); + + return functionDetails; + } + + private boolean isFunctionCodeBuiltin(FunctionDetails functionDetails) { + if (functionDetails.hasSource()) { + SourceSpec sourceSpec = functionDetails.getSource(); + if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) { + return true; + } + } + + if (functionDetails.hasSink()) { + SinkSpec sinkSpec = functionDetails.getSink(); + if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) { + return true; + } + } + + return false; } + private String getFunctionCodeBuiltin(FunctionDetails functionDetails) { + if (functionDetails.hasSource()) { + SourceSpec sourceSpec = functionDetails.getSource(); + if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) { + return sourceSpec.getBuiltin(); + } + } + + if (functionDetails.hasSink()) { + SinkSpec sinkSpec = functionDetails.getSink(); + if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) { + return sinkSpec.getBuiltin(); + } + } + + return null; + } + + private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName, String functionDetailsJson, File jarWithFileUrl) throws IllegalArgumentException { if (tenant == null) { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java index a1bcd4a424..575447792c 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java @@ -47,7 +47,7 @@ /** * Validates FunctionActioner tries to download file from bk. - * + * * @throws Exception */ @Test @@ -68,7 +68,8 @@ public void testStartFunctionWithDLNamespace() throws Exception { LinkedBlockingQueue<FunctionAction> queue = new LinkedBlockingQueue<>(); @SuppressWarnings("resource") - FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, queue); + FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, queue, + new ConnectorsManager(workerConfig)); Runtime runtime = mock(Runtime.class); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant") @@ -110,7 +111,8 @@ public void testStartFunctionWithPkgUrl() throws Exception { LinkedBlockingQueue<FunctionAction> queue = new LinkedBlockingQueue<>(); @SuppressWarnings("resource") - FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, queue); + FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, queue, + new ConnectorsManager(workerConfig)); // (1) test with file url. functionActioner should be able to consider file-url and it should be able to call // RuntimeSpawner diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 690f474150..8ab7473eed 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -92,7 +92,8 @@ public void testProcessAssignmentUpdateAddFunctions() throws Exception { workerConfig, pulsarClient, mock(Namespace.class), - mock(MembershipManager.class) + mock(MembershipManager.class), + mock(ConnectorsManager.class) )); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( @@ -185,7 +186,8 @@ public void testProcessAssignmentUpdateDeleteFunctions() throws Exception { workerConfig, pulsarClient, mock(Namespace.class), - mock(MembershipManager.class) + mock(MembershipManager.class), + mock(ConnectorsManager.class) )); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( @@ -282,7 +284,8 @@ public void testProcessAssignmentUpdateModifyFunctions() throws Exception { workerConfig, pulsarClient, mock(Namespace.class), - mock(MembershipManager.class) + mock(MembershipManager.class), + mock(ConnectorsManager.class) )); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java index 6dd3fa3c3c..6bf134306e 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java @@ -135,7 +135,8 @@ public void testCheckFailuresNoFailures() throws Exception { workerConfig, pulsarClient, mock(Namespace.class), - mock(MembershipManager.class) + mock(MembershipManager.class), + mock(ConnectorsManager.class) )); FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); MembershipManager membershipManager = spy(new MembershipManager(workerConfig, mockPulsarClient())); @@ -199,7 +200,8 @@ public void testCheckFailuresSomeFailures() throws Exception { workerConfig, pulsarClient, mock(Namespace.class), - mock(MembershipManager.class) + mock(MembershipManager.class), + mock(ConnectorsManager.class) )); FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); @@ -288,7 +290,8 @@ public void testCheckFailuresSomeUnassigned() throws Exception { workerConfig, pulsarClient, mock(Namespace.class), - mock(MembershipManager.class) + mock(MembershipManager.class), + mock(ConnectorsManager.class) )); FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); MembershipManager membershipManager = spy(new MembershipManager(workerConfig, mockPulsarClient())); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services