This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7c3bbc0913bddd188305017b710700e84afe6763 Author: Boyang Jerry Peng <[email protected]> AuthorDate: Sun Jan 24 18:42:12 2021 -0800 Fix: don't attempt to clean up packages when Source/Sink is builtin (#9289) ### Motivation For Pulsar Functions / IO, don't attempt to clean up packages when Source/Sink is builtin. Though it doesn't really cause any problems even if we do just a nasty exception gets logged. (cherry picked from commit 47b05c45000371ad26f5a197023e34ade7834fb2) --- .../functions/worker/rest/api/ComponentImpl.java | 5 +- .../worker/rest/api/v3/SinkApiV3ResourceTest.java | 77 ++++++++++++++++++++++ .../rest/api/v3/SourceApiV3ResourceTest.java | 77 ++++++++++++++++++++++ 3 files changed, 158 insertions(+), 1 deletion(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index 29766e6..56771f6 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -400,7 +400,10 @@ public abstract class ComponentImpl { ComponentTypeUtils.toString(componentType), tenant, namespace, componentName)); // clean up component files stored in BK - if (!functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.HTTP) && !functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.FILE)) { + String functionPackagePath = functionMetaData.getPackageLocation().getPackagePath(); + if (!functionPackagePath.startsWith(Utils.HTTP) + && !functionPackagePath.startsWith(Utils.FILE) + && !functionPackagePath.startsWith(Utils.BUILTIN)) { try { WorkerUtils.deleteFromBookkeeper(worker().getDlogNamespace(), functionMetaData.getPackageLocation().getPackagePath()); } catch (IOException e) { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java index 6b55106..ed1c116 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java @@ -28,6 +28,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Tenants; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.Utils; import org.apache.pulsar.common.io.SinkConfig; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -73,6 +74,7 @@ import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; import static org.powermock.api.mockito.PowerMockito.doNothing; import static org.powermock.api.mockito.PowerMockito.doReturn; @@ -1152,6 +1154,81 @@ public class SinkApiV3ResourceTest { } } + @Test + public void testDeregisterSinkBKPackageCleanup() throws IOException { + + mockStatic(WorkerUtils.class); + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); + + String packagePath = "public/default/test/591541f0-c7c5-40c0-983b-610c722f90b0-pulsar-io-batch-data-generator-2.7.0.nar"; + when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))) + .thenReturn(FunctionMetaData.newBuilder().setPackageLocation( + Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build()); + + deregisterDefaultSink(); + + PowerMockito.verifyStatic(WorkerUtils.class, times(1)); + WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath)); + } + + @Test + public void testDeregisterBuiltinSinkBKPackageCleanup() throws IOException { + + mockStatic(WorkerUtils.class); + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); + + String packagePath = String.format("%s://data-generator", Utils.BUILTIN); + when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))) + .thenReturn(FunctionMetaData.newBuilder().setPackageLocation( + Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build()); + + deregisterDefaultSink(); + + // if the sink is a builtin sink we shouldn't try to clean it up + PowerMockito.verifyStatic(WorkerUtils.class, times(0)); + WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath)); + } + + @Test + public void testDeregisterHTTPSinkBKPackageCleanup() throws IOException { + + mockStatic(WorkerUtils.class); + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); + + String packagePath = String.format("http://foo.com/connector.jar", Utils.BUILTIN); + when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))) + .thenReturn(FunctionMetaData.newBuilder().setPackageLocation( + Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build()); + + deregisterDefaultSink(); + + // if the sink is a is download from a http url, we shouldn't try to clean it up + PowerMockito.verifyStatic(WorkerUtils.class, times(0)); + WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath)); + } + + @Test + public void testDeregisterFileSinkBKPackageCleanup() throws IOException { + + mockStatic(WorkerUtils.class); + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); + + String packagePath = String.format("file://foo/connector.jar", Utils.BUILTIN); + when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))) + .thenReturn(FunctionMetaData.newBuilder().setPackageLocation( + Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build()); + + deregisterDefaultSink(); + + // if the sink package has a file url, we shouldn't try to clean it up + PowerMockito.verifyStatic(WorkerUtils.class, times(0)); + WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath)); + } + // // Get Sink Info // diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java index cf6945f..3569e6a 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java @@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; import static org.powermock.api.mockito.PowerMockito.doNothing; import static org.powermock.api.mockito.PowerMockito.doReturn; @@ -50,6 +51,7 @@ import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Tenants; +import org.apache.pulsar.common.functions.Utils; import org.apache.pulsar.common.io.SourceConfig; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -1182,6 +1184,81 @@ public class SourceApiV3ResourceTest { } } + @Test + public void testDeregisterSourceBKPackageCleanup() throws IOException { + + mockStatic(WorkerUtils.class); + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true); + + String packagePath = "public/default/test/591541f0-c7c5-40c0-983b-610c722f90b0-pulsar-io-batch-data-generator-2.7.0.nar"; + when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))) + .thenReturn(FunctionMetaData.newBuilder().setPackageLocation( + PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build()); + + deregisterDefaultSource(); + + PowerMockito.verifyStatic(WorkerUtils.class, times(1)); + WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath)); + } + + @Test + public void testDeregisterBuiltinSourceBKPackageCleanup() throws IOException { + + mockStatic(WorkerUtils.class); + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true); + + String packagePath = String.format("%s://data-generator", Utils.BUILTIN); + when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))) + .thenReturn(FunctionMetaData.newBuilder().setPackageLocation( + PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build()); + + deregisterDefaultSource(); + + // if the source is a builtin source we shouldn't try to clean it up + PowerMockito.verifyStatic(WorkerUtils.class, times(0)); + WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath)); + } + + @Test + public void testDeregisterHTTPSourceBKPackageCleanup() throws IOException { + + mockStatic(WorkerUtils.class); + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true); + + String packagePath = String.format("http://foo.com/connector.jar", Utils.BUILTIN); + when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))) + .thenReturn(FunctionMetaData.newBuilder().setPackageLocation( + PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build()); + + deregisterDefaultSource(); + + // if the source is a is download from a http url, we shouldn't try to clean it up + PowerMockito.verifyStatic(WorkerUtils.class, times(0)); + WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath)); + } + + @Test + public void testDeregisterFileSourceBKPackageCleanup() throws IOException { + + mockStatic(WorkerUtils.class); + + when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true); + + String packagePath = String.format("file://foo/connector.jar", Utils.BUILTIN); + when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))) + .thenReturn(FunctionMetaData.newBuilder().setPackageLocation( + PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build()); + + deregisterDefaultSource(); + + // if the source has a file url, we shouldn't try to clean it up + PowerMockito.verifyStatic(WorkerUtils.class, times(0)); + WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath)); + } + // // Get Source Info //
