This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ce56c0fb1c2a86f94c17c193cff1c4d9ba458108
Author: Rui Fu <freez...@users.noreply.github.com>
AuthorDate: Tue Apr 5 03:48:05 2022 +0800

    allow download package from package management service (#14814)
    
    (cherry picked from commit 7cc5cf1fadf8835fcc3166494f0e05632894da43)
---
 .../pulsar/functions/worker/FunctionActioner.java  | 22 ++++++----
 .../functions/worker/FunctionActionerTest.java     | 49 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 5fa55409725..758157976f1 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -18,6 +18,14 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.pulsar.common.functions.Utils.FILE;
+import static org.apache.pulsar.common.functions.Utils.HTTP;
+import static org.apache.pulsar.common.functions.Utils.hasPackageTypePrefix;
+import static 
org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported;
+import static 
org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.io.MoreFiles;
 import com.google.common.io.RecursiveDeleteOption;
@@ -65,14 +73,6 @@ import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import static org.apache.commons.lang3.StringUtils.isBlank;
-import static org.apache.pulsar.common.functions.Utils.FILE;
-import static org.apache.pulsar.common.functions.Utils.HTTP;
-import static 
org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported;
-import static 
org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
-import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
-import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType;
-
 @Data
 @Slf4j
 public class FunctionActioner {
@@ -192,7 +192,8 @@ public class FunctionActioner {
         return instanceConfig;
     }
 
-    private void downloadFile(File pkgFile, boolean isPkgUrlProvided, 
FunctionMetaData functionMetaData, int instanceId) throws 
FileNotFoundException, IOException {
+    private void downloadFile(File pkgFile, boolean isPkgUrlProvided, 
FunctionMetaData functionMetaData,
+                              int instanceId) throws FileNotFoundException, 
IOException, PulsarAdminException {
 
         FunctionDetails details = functionMetaData.getFunctionDetails();
         File pkgDir = pkgFile.getParentFile();
@@ -211,12 +212,15 @@ public class FunctionActioner {
         } while (tempPkgFile.exists() || !tempPkgFile.createNewFile());
         String pkgLocationPath = 
functionMetaData.getPackageLocation().getPackagePath();
         boolean downloadFromHttp = isPkgUrlProvided && 
pkgLocationPath.startsWith(HTTP);
+        boolean downloadFromPackageManagementService = isPkgUrlProvided && 
hasPackageTypePrefix(pkgLocationPath);
         log.info("{}/{}/{} Function package file {} will be downloaded from 
{}", tempPkgFile, details.getTenant(),
                 details.getNamespace(), details.getName(),
                 downloadFromHttp ? pkgLocationPath : 
functionMetaData.getPackageLocation());
 
         if(downloadFromHttp) {
             FunctionCommon.downloadFromHttpUrl(pkgLocationPath, tempPkgFile);
+        } else if (downloadFromPackageManagementService) {
+            getPulsarAdmin().packages().download(pkgLocationPath, 
tempPkgFile.getPath());
         } else {
             FileOutputStream tempPkgFos = new FileOutputStream(tempPkgFile);
             WorkerUtils.downloadFromBookkeeper(
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index dc49036a5be..7502e247d99 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.admin.Packages;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.auth.FunctionAuthProvider;
@@ -220,4 +221,52 @@ public class FunctionActionerTest {
         verify(functionAuthProvider.get(), times(0)).cleanUpAuthData(any(), 
any());
     }
 
+    @Test
+    public void testStartFunctionWithPackageUrl() throws Exception {
+
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setWorkerId("worker-1");
+        
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(
+                        new 
ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
+        workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+        workerConfig.setStateStorageServiceUrl("foo");
+        workerConfig.setFunctionAssignmentTopicName("assignments");
+        String downloadDir = 
this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+        workerConfig.setDownloadDirectory(downloadDir);
+
+        RuntimeFactory factory = mock(RuntimeFactory.class);
+        Runtime runtime = mock(Runtime.class);
+        doReturn(runtime).when(factory).createContainer(any(), any(), any(), 
any());
+        doNothing().when(runtime).start();
+        Namespace dlogNamespace = mock(Namespace.class);
+        final String exceptionMsg = "dl namespace not-found";
+        doThrow(new 
IllegalArgumentException(exceptionMsg)).when(dlogNamespace).openLog(any());
+        PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
+        Packages packages = mock(Packages.class);
+        doReturn(packages).when(pulsarAdmin).packages();
+        doNothing().when(packages).download(any(), any());
+
+        @SuppressWarnings("resource")
+        FunctionActioner actioner = new FunctionActioner(workerConfig, 
factory, dlogNamespace,
+                new ConnectorsManager(workerConfig), new 
FunctionsManager(workerConfig), pulsarAdmin);
+
+        // (1) test with file url. functionActioner should be able to consider 
file-url and it should be able to call
+        // RuntimeSpawner
+        String pkgPathLocation = 
"function://public/default/test-function@latest";
+        Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder()
+                
.setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
+                        .setNamespace("test-namespace").setName("func-1"))
+                
.setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath(pkgPathLocation).build())
+                .build();
+        Function.Instance instance = 
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
+                .build();
+        FunctionRuntimeInfo functionRuntimeInfo = 
mock(FunctionRuntimeInfo.class);
+        doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
+
+        actioner.startFunction(functionRuntimeInfo);
+        verify(runtime, times(1)).start();
+    }
+
 }

Reply via email to