This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ce56c0fb1c2a86f94c17c193cff1c4d9ba458108 Author: Rui Fu <freez...@users.noreply.github.com> AuthorDate: Tue Apr 5 03:48:05 2022 +0800 allow download package from package management service (#14814) (cherry picked from commit 7cc5cf1fadf8835fcc3166494f0e05632894da43) --- .../pulsar/functions/worker/FunctionActioner.java | 22 ++++++---- .../functions/worker/FunctionActionerTest.java | 49 ++++++++++++++++++++++ 2 files changed, 62 insertions(+), 9 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 5fa55409725..758157976f1 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -18,6 +18,14 @@ */ package org.apache.pulsar.functions.worker; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.pulsar.common.functions.Utils.FILE; +import static org.apache.pulsar.common.functions.Utils.HTTP; +import static org.apache.pulsar.common.functions.Utils.hasPackageTypePrefix; +import static org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported; +import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData; +import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType; +import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.io.MoreFiles; import com.google.common.io.RecursiveDeleteOption; @@ -65,14 +73,6 @@ import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.apache.commons.lang3.StringUtils.isBlank; -import static org.apache.pulsar.common.functions.Utils.FILE; -import static org.apache.pulsar.common.functions.Utils.HTTP; -import static org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported; -import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData; -import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType; -import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType; - @Data @Slf4j public class FunctionActioner { @@ -192,7 +192,8 @@ public class FunctionActioner { return instanceConfig; } - private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaData functionMetaData, int instanceId) throws FileNotFoundException, IOException { + private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaData functionMetaData, + int instanceId) throws FileNotFoundException, IOException, PulsarAdminException { FunctionDetails details = functionMetaData.getFunctionDetails(); File pkgDir = pkgFile.getParentFile(); @@ -211,12 +212,15 @@ public class FunctionActioner { } while (tempPkgFile.exists() || !tempPkgFile.createNewFile()); String pkgLocationPath = functionMetaData.getPackageLocation().getPackagePath(); boolean downloadFromHttp = isPkgUrlProvided && pkgLocationPath.startsWith(HTTP); + boolean downloadFromPackageManagementService = isPkgUrlProvided && hasPackageTypePrefix(pkgLocationPath); log.info("{}/{}/{} Function package file {} will be downloaded from {}", tempPkgFile, details.getTenant(), details.getNamespace(), details.getName(), downloadFromHttp ? pkgLocationPath : functionMetaData.getPackageLocation()); if(downloadFromHttp) { FunctionCommon.downloadFromHttpUrl(pkgLocationPath, tempPkgFile); + } else if (downloadFromPackageManagementService) { + getPulsarAdmin().packages().download(pkgLocationPath, tempPkgFile.getPath()); } else { FileOutputStream tempPkgFos = new FileOutputStream(tempPkgFile); WorkerUtils.downloadFromBookkeeper( diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java index dc49036a5be..7502e247d99 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.pulsar.client.admin.Packages; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.auth.FunctionAuthProvider; @@ -220,4 +221,52 @@ public class FunctionActionerTest { verify(functionAuthProvider.get(), times(0)).cleanUpAuthData(any(), any()); } + @Test + public void testStartFunctionWithPackageUrl() throws Exception { + + WorkerConfig workerConfig = new WorkerConfig(); + workerConfig.setWorkerId("worker-1"); + workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName()); + workerConfig.setFunctionRuntimeFactoryConfigs( + ObjectMapperFactory.getThreadLocal().convertValue( + new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class)); + workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); + workerConfig.setStateStorageServiceUrl("foo"); + workerConfig.setFunctionAssignmentTopicName("assignments"); + String downloadDir = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath(); + workerConfig.setDownloadDirectory(downloadDir); + + RuntimeFactory factory = mock(RuntimeFactory.class); + Runtime runtime = mock(Runtime.class); + doReturn(runtime).when(factory).createContainer(any(), any(), any(), any()); + doNothing().when(runtime).start(); + Namespace dlogNamespace = mock(Namespace.class); + final String exceptionMsg = "dl namespace not-found"; + doThrow(new IllegalArgumentException(exceptionMsg)).when(dlogNamespace).openLog(any()); + PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class); + Packages packages = mock(Packages.class); + doReturn(packages).when(pulsarAdmin).packages(); + doNothing().when(packages).download(any(), any()); + + @SuppressWarnings("resource") + FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, + new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), pulsarAdmin); + + // (1) test with file url. functionActioner should be able to consider file-url and it should be able to call + // RuntimeSpawner + String pkgPathLocation = "function://public/default/test-function@latest"; + Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() + .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant") + .setNamespace("test-namespace").setName("func-1")) + .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath(pkgPathLocation).build()) + .build(); + Function.Instance instance = Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0) + .build(); + FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class); + doReturn(instance).when(functionRuntimeInfo).getFunctionInstance(); + + actioner.startFunction(functionRuntimeInfo); + verify(runtime, times(1)).start(); + } + }