This is an automated email from the ASF dual-hosted git repository.
cbornet pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 7350ae4e3ba [fix][functions] Fix the download of builtin Functions
(#17877)
7350ae4e3ba is described below
commit 7350ae4e3ba446e0e4556c0e32a0399e93062bf0
Author: Christophe Bornet <[email protected]>
AuthorDate: Fri Sep 30 22:05:35 2022 +0200
[fix][functions] Fix the download of builtin Functions (#17877)
---
.../functions/worker/rest/api/ComponentImpl.java | 53 ++++---
.../rest/api/v3/FunctionApiV3ResourceTest.java | 158 ++++++++++++++-------
2 files changed, 144 insertions(+), 67 deletions(-)
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 239e1d7b0fd..4829289aee2 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -47,7 +47,6 @@ import java.util.Base64;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
-import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -102,7 +101,7 @@ import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
-import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -1436,14 +1435,22 @@ public abstract class ComponentImpl implements
Component<PulsarWorkerService> {
String.format("%s %s doesn't exist",
ComponentTypeUtils.toString(componentType), componentName));
}
- String pkgPath = functionMetaDataManager.getFunctionMetaData(tenant,
namespace, componentName)
- .getPackageLocation().getPackagePath();
+ FunctionMetaData functionMetaData =
+ functionMetaDataManager.getFunctionMetaData(tenant, namespace,
componentName);
+ String pkgPath =
functionMetaData.getPackageLocation().getPackagePath();
+
+ FunctionDetails.ComponentType componentType =
+
InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails());
- return getStreamingOutput(pkgPath);
+ return getStreamingOutput(pkgPath, componentType);
}
private StreamingOutput getStreamingOutput(String pkgPath) {
- final StreamingOutput streamingOutput = output -> {
+ return getStreamingOutput(pkgPath, null);
+ }
+
+ private StreamingOutput getStreamingOutput(String pkgPath,
FunctionDetails.ComponentType componentType) {
+ return output -> {
if (pkgPath.startsWith(Utils.HTTP)) {
URL url = URI.create(pkgPath).toURL();
try (InputStream inputStream = url.openStream()) {
@@ -1455,15 +1462,7 @@ public abstract class ComponentImpl implements
Component<PulsarWorkerService> {
Files.copy(file.toPath(), output);
} else if (pkgPath.startsWith(Utils.BUILTIN)
&&
!worker().getWorkerConfig().getUploadBuiltinSinksSources()) {
- String sType = pkgPath.replaceFirst("^builtin://", "");
- final String connectorsDir =
worker().getWorkerConfig().getConnectorsDirectory();
- log.warn("Processing package {} ; looking at the dir {}",
pkgPath, connectorsDir);
- TreeMap<String, FunctionArchive> sinksOrSources =
- FunctionUtils.searchForFunctions(connectorsDir, true);
- Path narPath = sinksOrSources.get(sType).getArchivePath();
- if (narPath == null) {
- throw new IllegalStateException("Didn't find " + pkgPath +
" in " + connectorsDir);
- }
+ Path narPath = getBuiltinArchivePath(pkgPath, componentType);
log.info("Loading {} from {}", pkgPath, narPath);
try (InputStream in = new FileInputStream(narPath.toString()))
{
IOUtils.copy(in, output, 1024);
@@ -1477,14 +1476,34 @@ public abstract class ComponentImpl implements
Component<PulsarWorkerService> {
output.flush();
}
} catch (Exception e) {
- log.error("Failed download package {} from packageMangment
Service", pkgPath, e);
+ log.error("Failed download package {} from
packageManagement Service", pkgPath, e);
}
} else {
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), output,
pkgPath);
}
};
- return streamingOutput;
+ }
+
+ private Path getBuiltinArchivePath(String pkgPath,
FunctionDetails.ComponentType componentType) {
+ String type = pkgPath.replaceFirst("^builtin://", "");
+ if (!FunctionDetails.ComponentType.FUNCTION.equals(componentType)) {
+ Connector connector =
worker().getConnectorsManager().getConnector(type);
+ if (connector != null) {
+ return connector.getArchivePath();
+ }
+ if (componentType != null) {
+ throw new IllegalStateException("Didn't find " + type + " in
built-in connectors");
+ }
+ }
+ FunctionArchive function =
worker().getFunctionsManager().getFunction(type);
+ if (function != null) {
+ return function.getArchivePath();
+ }
+ if (componentType != null) {
+ throw new IllegalStateException("Didn't find " + type + " in
built-in functions");
+ }
+ throw new IllegalStateException("Didn't find " + type + " in built-in
connectors or functions");
}
@Override
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index a868d632506..853127cb22f 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.functions.worker.rest.api.v3;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
@@ -29,7 +28,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
@@ -44,7 +42,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Consumer;
import javax.ws.rs.core.Response;
@@ -78,7 +75,8 @@ import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
-import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+import org.apache.pulsar.functions.utils.io.Connector;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.FunctionsManager;
@@ -1604,20 +1602,13 @@ public class FunctionApiV3ResourceTest {
String jarHttpUrl =
"https://repo1.maven.org/maven2/org/apache/pulsar/pulsar-common/2.4.2/pulsar-common-2.4.2.jar";
String testDir =
FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- PulsarWorkerService worker = mock(PulsarWorkerService.class);
- doReturn(true).when(worker).isInitialized();
- WorkerConfig config = mock(WorkerConfig.class);
- when(config.isAuthorizationEnabled()).thenReturn(false);
- when(worker.getWorkerConfig()).thenReturn(config);
- FunctionsImpl function = new FunctionsImpl(() -> worker);
- StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl,
null, null);
+
+ StreamingOutput streamOutput = resource.downloadFunction(jarHttpUrl,
null, null);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
Assert.assertTrue(pkgFile.exists());
- if (pkgFile.exists()) {
- pkgFile.delete();
- }
+ pkgFile.delete();
}
@Test
@@ -1626,53 +1617,61 @@ public class FunctionApiV3ResourceTest {
File file = Paths.get(fileUrl.toURI()).toFile();
String fileLocation = file.getAbsolutePath().replace('\\', '/');
String testDir =
FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- PulsarWorkerService worker = mock(PulsarWorkerService.class);
- doReturn(true).when(worker).isInitialized();
- WorkerConfig config = mock(WorkerConfig.class);
- when(config.isAuthorizationEnabled()).thenReturn(false);
- when(worker.getWorkerConfig()).thenReturn(config);
- FunctionsImpl function = new FunctionsImpl(() -> worker);
- StreamingOutput streamOutput = function.downloadFunction("file:///" +
fileLocation, null, null);
+
+ StreamingOutput streamOutput = resource.downloadFunction("file:///" +
fileLocation, null, null);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
Assert.assertTrue(pkgFile.exists());
- if (pkgFile.exists()) {
- pkgFile.delete();
- }
+ Assert.assertEquals(file.length(), pkgFile.length());
+ pkgFile.delete();
}
@Test
- public void testDownloadFunctionBuiltin() throws Exception {
- mockStatic(WorkerUtils.class, ctx -> {
- });
-
+ public void testDownloadFunctionBuiltinConnector() throws Exception {
URL fileUrl =
getClass().getClassLoader().getResource("test_worker_config.yml");
File file = Paths.get(fileUrl.toURI()).toFile();
String testDir =
FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- PulsarWorkerService worker = mock(PulsarWorkerService.class);
- doReturn(true).when(worker).isInitialized();
+ WorkerConfig config = new WorkerConfig()
+ .setUploadBuiltinSinksSources(false);
+ when(mockedWorkerService.getWorkerConfig()).thenReturn(config);
- WorkerConfig config = mock(WorkerConfig.class);
- when(config.isAuthorizationEnabled()).thenReturn(false);
- when(config.getUploadBuiltinSinksSources()).thenReturn(false);
- when(config.getConnectorsDirectory()).thenReturn("/connectors");
+ Connector connector =
Connector.builder().archivePath(file.toPath()).build();
+ ConnectorsManager connectorsManager = mock(ConnectorsManager.class);
+
when(connectorsManager.getConnector("cassandra")).thenReturn(connector);
+
when(mockedWorkerService.getConnectorsManager()).thenReturn(connectorsManager);
- when(worker.getDlogNamespace()).thenReturn(mock(Namespace.class));
- when(worker.getWorkerConfig()).thenReturn(config);
- FunctionsImpl function = new FunctionsImpl(() -> worker);
+ StreamingOutput streamOutput =
resource.downloadFunction("builtin://cassandra", null, null);
- TreeMap<String, FunctionArchive> functions = new TreeMap<>();
- FunctionArchive functionArchive =
FunctionArchive.builder().archivePath(file.toPath()).build();
- functions.put("cassandra", functionArchive);
+ File pkgFile = new File(testDir, UUID.randomUUID().toString());
+ OutputStream output = new FileOutputStream(pkgFile);
+ streamOutput.write(output);
+ output.flush();
+ output.close();
+ Assert.assertTrue(pkgFile.exists());
+ Assert.assertTrue(pkgFile.exists());
+ Assert.assertEquals(file.length(), pkgFile.length());
+ pkgFile.delete();
+ }
- mockStatic(FunctionUtils.class, ctx -> {
- ctx.when(() -> FunctionUtils.searchForFunctions(anyString(),
anyBoolean())).thenReturn(functions);
+ @Test
+ public void testDownloadFunctionBuiltinFunction() throws Exception {
+ URL fileUrl =
getClass().getClassLoader().getResource("test_worker_config.yml");
+ File file = Paths.get(fileUrl.toURI()).toFile();
+ String testDir =
FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- });
+ WorkerConfig config = new WorkerConfig()
+ .setUploadBuiltinSinksSources(false);
+ when(mockedWorkerService.getWorkerConfig()).thenReturn(config);
+
+ FunctionsManager functionsManager = mock(FunctionsManager.class);
+ FunctionArchive functionArchive =
FunctionArchive.builder().archivePath(file.toPath()).build();
+
when(functionsManager.getFunction("exclamation")).thenReturn(functionArchive);
+
when(mockedWorkerService.getConnectorsManager()).thenReturn(mock(ConnectorsManager.class));
+
when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager);
- StreamingOutput streamOutput =
function.downloadFunction("builtin://cassandra", null, null);
+ StreamingOutput streamOutput =
resource.downloadFunction("builtin://exclamation", null, null);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
@@ -1680,12 +1679,71 @@ public class FunctionApiV3ResourceTest {
output.flush();
output.close();
Assert.assertTrue(pkgFile.exists());
- if (pkgFile.exists()) {
- Assert.assertEquals(file.length(), pkgFile.length());
- pkgFile.delete();
- } else {
- fail("expected file");
- }
+ Assert.assertEquals(file.length(), pkgFile.length());
+ pkgFile.delete();
+ }
+
+ @Test
+ public void testDownloadFunctionBuiltinConnectorByName() throws Exception {
+ URL fileUrl =
getClass().getClassLoader().getResource("test_worker_config.yml");
+ File file = Paths.get(fileUrl.toURI()).toFile();
+ String testDir =
FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+ WorkerConfig config = new WorkerConfig()
+ .setUploadBuiltinSinksSources(false);
+ when(mockedWorkerService.getWorkerConfig()).thenReturn(config);
+
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace),
eq(function))).thenReturn(true);
+
+ FunctionMetaData metaData = FunctionMetaData.newBuilder()
+
.setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("builtin://cassandra"))
+
.setFunctionDetails(FunctionDetails.newBuilder().setComponentType(FunctionDetails.ComponentType.SINK))
+ .build();
+ when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace),
eq(function))).thenReturn(metaData);
+
+ Connector connector =
Connector.builder().archivePath(file.toPath()).build();
+ ConnectorsManager connectorsManager = mock(ConnectorsManager.class);
+
when(connectorsManager.getConnector("cassandra")).thenReturn(connector);
+
when(mockedWorkerService.getConnectorsManager()).thenReturn(connectorsManager);
+
+ StreamingOutput streamOutput = resource.downloadFunction(tenant,
namespace, function, null, null);
+ File pkgFile = new File(testDir, UUID.randomUUID().toString());
+ OutputStream output = new FileOutputStream(pkgFile);
+ streamOutput.write(output);
+ Assert.assertTrue(pkgFile.exists());
+ Assert.assertEquals(file.length(), pkgFile.length());
+ pkgFile.delete();
+ }
+
+ @Test
+ public void testDownloadFunctionBuiltinFunctionByName() throws Exception {
+ URL fileUrl =
getClass().getClassLoader().getResource("test_worker_config.yml");
+ File file = Paths.get(fileUrl.toURI()).toFile();
+ String testDir =
FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+ WorkerConfig config = new WorkerConfig()
+ .setUploadBuiltinSinksSources(false);
+ when(mockedWorkerService.getWorkerConfig()).thenReturn(config);
+
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace),
eq(function))).thenReturn(true);
+
+ FunctionMetaData metaData = FunctionMetaData.newBuilder()
+
.setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("builtin://exclamation"))
+
.setFunctionDetails(FunctionDetails.newBuilder().setComponentType(FunctionDetails.ComponentType.FUNCTION))
+ .build();
+ when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace),
eq(function))).thenReturn(metaData);
+
+ FunctionsManager functionsManager = mock(FunctionsManager.class);
+ FunctionArchive functionArchive =
FunctionArchive.builder().archivePath(file.toPath()).build();
+
when(functionsManager.getFunction("exclamation")).thenReturn(functionArchive);
+
when(mockedWorkerService.getConnectorsManager()).thenReturn(mock(ConnectorsManager.class));
+
when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager);
+
+ StreamingOutput streamOutput = resource.downloadFunction(tenant,
namespace, function, null, null);
+ File pkgFile = new File(testDir, UUID.randomUUID().toString());
+ OutputStream output = new FileOutputStream(pkgFile);
+ streamOutput.write(output);
+ Assert.assertTrue(pkgFile.exists());
+ Assert.assertEquals(file.length(), pkgFile.length());
+ pkgFile.delete();
}
@Test