This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7c3bbc0913bddd188305017b710700e84afe6763
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Sun Jan 24 18:42:12 2021 -0800

    Fix: don't attempt to clean up packages when Source/Sink is builtin (#9289)
    
    ### Motivation
    
    For Pulsar Functions / IO, don't attempt to clean up packages when 
Source/Sink is builtin.  Though it doesn't really cause any problems even if we 
do just a nasty exception gets logged.
    
    
    
    (cherry picked from commit 47b05c45000371ad26f5a197023e34ade7834fb2)
---
 .../functions/worker/rest/api/ComponentImpl.java   |  5 +-
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  | 77 ++++++++++++++++++++++
 .../rest/api/v3/SourceApiV3ResourceTest.java       | 77 ++++++++++++++++++++++
 3 files changed, 158 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 29766e6..56771f6 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -400,7 +400,10 @@ public abstract class ComponentImpl {
                         ComponentTypeUtils.toString(componentType), tenant, 
namespace, componentName));
 
         // clean up component files stored in BK
-        if 
(!functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.HTTP) 
&& 
!functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.FILE)) 
{
+        String functionPackagePath = 
functionMetaData.getPackageLocation().getPackagePath();
+        if (!functionPackagePath.startsWith(Utils.HTTP)
+                && !functionPackagePath.startsWith(Utils.FILE)
+                && !functionPackagePath.startsWith(Utils.BUILTIN)) {
             try {
                 WorkerUtils.deleteFromBookkeeper(worker().getDlogNamespace(), 
functionMetaData.getPackageLocation().getPackagePath());
             } catch (IOException e) {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 6b55106..ed1c116 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -73,6 +74,7 @@ import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.doNothing;
 import static org.powermock.api.mockito.PowerMockito.doReturn;
@@ -1152,6 +1154,81 @@ public class SinkApiV3ResourceTest {
         }
     }
 
+    @Test
+    public void testDeregisterSinkBKPackageCleanup() throws IOException {
+
+        mockStatic(WorkerUtils.class);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(true);
+
+        String packagePath = 
"public/default/test/591541f0-c7c5-40c0-983b-610c722f90b0-pulsar-io-batch-data-generator-2.7.0.nar";
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), 
eq(sink)))
+                .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+                        
Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+        deregisterDefaultSink();
+
+        PowerMockito.verifyStatic(WorkerUtils.class, times(1));
+        WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+    }
+
+    @Test
+    public void testDeregisterBuiltinSinkBKPackageCleanup() throws IOException 
{
+
+        mockStatic(WorkerUtils.class);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(true);
+
+        String packagePath = String.format("%s://data-generator", 
Utils.BUILTIN);
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), 
eq(sink)))
+                .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+                        
Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+        deregisterDefaultSink();
+
+        // if the sink is a builtin sink we shouldn't try to clean it up
+        PowerMockito.verifyStatic(WorkerUtils.class, times(0));
+        WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+    }
+
+    @Test
+    public void testDeregisterHTTPSinkBKPackageCleanup() throws IOException {
+
+        mockStatic(WorkerUtils.class);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(true);
+
+        String packagePath = String.format("http://foo.com/connector.jar";, 
Utils.BUILTIN);
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), 
eq(sink)))
+                .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+                        
Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+        deregisterDefaultSink();
+
+        // if the sink is a is download from a http url, we shouldn't try to 
clean it up
+        PowerMockito.verifyStatic(WorkerUtils.class, times(0));
+        WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+    }
+
+    @Test
+    public void testDeregisterFileSinkBKPackageCleanup() throws IOException {
+
+        mockStatic(WorkerUtils.class);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(true);
+
+        String packagePath = String.format("file://foo/connector.jar", 
Utils.BUILTIN);
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), 
eq(sink)))
+                .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+                        
Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+        deregisterDefaultSink();
+
+        // if the sink package has a file url, we shouldn't try to clean it up
+        PowerMockito.verifyStatic(WorkerUtils.class, times(0));
+        WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+    }
+
     //
     // Get Sink Info
     //
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index cf6945f..3569e6a 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.doNothing;
 import static org.powermock.api.mockito.PowerMockito.doReturn;
@@ -50,6 +51,7 @@ import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -1182,6 +1184,81 @@ public class SourceApiV3ResourceTest {
         }
     }
 
+    @Test
+    public void testDeregisterSourceBKPackageCleanup() throws IOException {
+
+        mockStatic(WorkerUtils.class);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(source))).thenReturn(true);
+
+        String packagePath = 
"public/default/test/591541f0-c7c5-40c0-983b-610c722f90b0-pulsar-io-batch-data-generator-2.7.0.nar";
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), 
eq(source)))
+                .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+                        
PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+        deregisterDefaultSource();
+
+        PowerMockito.verifyStatic(WorkerUtils.class, times(1));
+        WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+    }
+
+    @Test
+    public void testDeregisterBuiltinSourceBKPackageCleanup() throws 
IOException {
+
+        mockStatic(WorkerUtils.class);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(source))).thenReturn(true);
+
+        String packagePath = String.format("%s://data-generator", 
Utils.BUILTIN);
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), 
eq(source)))
+                .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+                        
PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+        deregisterDefaultSource();
+
+        // if the source is a builtin source we shouldn't try to clean it up
+        PowerMockito.verifyStatic(WorkerUtils.class, times(0));
+        WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+    }
+
+    @Test
+    public void testDeregisterHTTPSourceBKPackageCleanup() throws IOException {
+
+        mockStatic(WorkerUtils.class);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(source))).thenReturn(true);
+
+        String packagePath = String.format("http://foo.com/connector.jar";, 
Utils.BUILTIN);
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), 
eq(source)))
+                .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+                        
PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+        deregisterDefaultSource();
+
+        // if the source is a is download from a http url, we shouldn't try to 
clean it up
+        PowerMockito.verifyStatic(WorkerUtils.class, times(0));
+        WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+    }
+
+    @Test
+    public void testDeregisterFileSourceBKPackageCleanup() throws IOException {
+
+        mockStatic(WorkerUtils.class);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(source))).thenReturn(true);
+
+        String packagePath = String.format("file://foo/connector.jar", 
Utils.BUILTIN);
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), 
eq(source)))
+                .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
+                        
PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+
+        deregisterDefaultSource();
+
+        // if the source has a file url, we shouldn't try to clean it up
+        PowerMockito.verifyStatic(WorkerUtils.class, times(0));
+        WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
+    }
+
     //
     // Get Source Info
     //

Reply via email to