This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c27f7a3 Optimize built-in source/sink startup Part 2 (#9500)
c27f7a3 is described below
commit c27f7a340a3c78a3a5004d08b8ac05ed99672ee3
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Mon Feb 8 12:33:06 2021 -0800
Optimize built-in source/sink startup Part 2 (#9500)
* Optimize built-in source/sink startup by eliminating redundant NAR
unpacking and checksum calculation Part 2. Allow ThreadRuntime to used cached
built-in connectors instead of unpacking and loading again.
Co-authored-by: Jerry Peng <[email protected]>
---
.../functions/instance/JavaInstanceRunnable.java | 54 ++----------
.../instance/JavaInstanceRunnableTest.java | 2 +-
.../pulsar/functions/runtime/RuntimeFactory.java | 2 +
.../kubernetes/KubernetesRuntimeFactory.java | 2 +
.../runtime/process/ProcessRuntimeFactory.java | 2 +
.../functions/runtime/thread/ThreadRuntime.java | 95 ++++++++++++++++++----
.../runtime/thread/ThreadRuntimeFactory.java | 15 +++-
.../pulsar/functions/worker/ConnectorsManager.java | 4 +-
.../kubernetes/KubernetesRuntimeFactoryTest.java | 5 +-
.../runtime/kubernetes/KubernetesRuntimeTest.java | 7 +-
.../runtime/process/ProcessRuntimeTest.java | 4 +-
.../runtime/thread/ThreadRuntimeFactoryTest.java | 2 +
.../pulsar/functions/utils/FunctionCommon.java | 22 +++++
.../pulsar/functions/worker/FunctionActioner.java | 4 +-
.../functions/worker/FunctionRuntimeManager.java | 4 +-
.../pulsar/functions/worker/WorkerUtils.java | 22 -----
.../functions/worker/rest/api/ComponentImpl.java | 8 +-
.../functions/worker/rest/api/FunctionsImpl.java | 2 +-
.../functions/worker/rest/api/SinksImpl.java | 3 +-
.../functions/worker/rest/api/SourcesImpl.java | 2 +-
.../functions/worker/rest/api/WorkerImpl.java | 2 +-
.../worker/FunctionRuntimeManagerTest.java | 2 +
.../worker/rest/api/FunctionsImplTest.java | 4 +-
23 files changed, 155 insertions(+), 114 deletions(-)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index c542f05..02083ed 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -73,6 +74,7 @@ import
org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
+import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger;
@@ -85,8 +87,6 @@ import org.slf4j.LoggerFactory;
public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private final InstanceConfig instanceConfig;
- private final FunctionCacheManager fnCache;
- private final String jarFile;
// input topic consumer & output topic producer
private final PulsarClientImpl client;
@@ -124,7 +124,6 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
private final ClassLoader instanceClassLoader;
private ClassLoader functionClassLoader;
- private String narExtractionDirectory;
// a flog to determine if member variables have been initialized as part
of setup().
// used for out of band API calls like operations involving stats
@@ -134,23 +133,19 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
private ReadWriteLock statsLock = new ReentrantReadWriteLock();
public JavaInstanceRunnable(InstanceConfig instanceConfig,
- FunctionCacheManager fnCache,
- String jarFile,
PulsarClient pulsarClient,
PulsarAdmin pulsarAdmin,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
CollectorRegistry collectorRegistry,
- String narExtractionDirectory) {
+ ClassLoader functionClassLoader) {
this.instanceConfig = instanceConfig;
- this.fnCache = fnCache;
- this.jarFile = jarFile;
this.client = (PulsarClientImpl) pulsarClient;
this.pulsarAdmin = pulsarAdmin;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.secretsProvider = secretsProvider;
this.collectorRegistry = collectorRegistry;
- this.narExtractionDirectory = narExtractionDirectory;
+ this.functionClassLoader = functionClassLoader;
this.metricsLabels = new String[]{
instanceConfig.getFunctionDetails().getTenant(),
String.format("%s/%s",
instanceConfig.getFunctionDetails().getTenant(),
@@ -197,9 +192,6 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
log.info("Starting Java Instance {} : \n Details = {}",
instanceConfig.getFunctionDetails().getName(),
instanceConfig.getFunctionDetails());
- // start the function thread
- functionClassLoader = loadJars();
-
Object object;
if
(instanceConfig.getFunctionDetails().getClassName().equals(org.apache.pulsar.functions.windowing.WindowFunctionExecutor.class.getName()))
{
object = Reflections.createInstance(
@@ -305,35 +297,6 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
}
}
- private ClassLoader loadJars() throws Exception {
- ClassLoader fnClassLoader;
- try {
- log.info("Load JAR: {}", jarFile);
- // Let's first try to treat it as a nar archive
- fnCache.registerFunctionInstanceWithArchive(
- instanceConfig.getFunctionId(),
- instanceConfig.getInstanceName(),
- jarFile, narExtractionDirectory);
- } catch (FileNotFoundException e) {
- // create the function class loader
- fnCache.registerFunctionInstance(
- instanceConfig.getFunctionId(),
- instanceConfig.getInstanceName(),
- Arrays.asList(jarFile),
- Collections.emptyList());
- }
-
- log.info("Initialize function class loader for function {} at function
cache manager, functionClassLoader: {}",
- instanceConfig.getFunctionDetails().getName(),
fnCache.getClassLoader(instanceConfig.getFunctionId()));
-
- fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
- if (null == fnClassLoader) {
- throw new Exception("No function class loader available.");
- }
-
- return fnClassLoader;
- }
-
private void setupStateStore() throws Exception {
this.stateManager = new InstanceStateManager();
@@ -475,14 +438,7 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
stateStoreProvider.close();
}
- if (instanceCache != null) {
- // once the thread quits, clean up the instance
- fnCache.unregisterFunctionInstance(
- instanceConfig.getFunctionId(),
- instanceConfig.getInstanceName());
- log.info("Unloading JAR files for function {}", instanceConfig);
- instanceCache = null;
- }
+ instanceCache = null;
if (logAppender != null) {
removeLogTopicAppender(LoggerContext.getContext());
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 9286b94..56d80ae 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -59,7 +59,7 @@ public class JavaInstanceRunnableTest {
private JavaInstanceRunnable createRunnable(String outputSerde) throws
Exception {
InstanceConfig config = createInstanceConfig(outputSerde);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
- config, null, null, null, null, null, null, null, null);
+ config, null, null, null, null, null, null);
return javaInstanceRunnable;
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index 56fb701..0f034d0 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.common.util.Reflections;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import java.util.Optional;
@@ -37,6 +38,7 @@ public interface RuntimeFactory extends AutoCloseable {
void initialize(WorkerConfig workerConfig,
AuthenticationConfig authenticationConfig,
SecretsProviderConfigurator secretsProviderConfigurator,
+ ConnectorsManager connectorsManager,
Optional<FunctionAuthProvider> authProvider,
Optional<RuntimeCustomizer> runtimeCustomizer) throws
Exception;
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index f001af4..b332722 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
import
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import java.lang.reflect.Field;
@@ -129,6 +130,7 @@ public class KubernetesRuntimeFactory implements
RuntimeFactory {
@Override
public void initialize(WorkerConfig workerConfig, AuthenticationConfig
authenticationConfig,
SecretsProviderConfigurator
secretsProviderConfigurator,
+ ConnectorsManager connectorsManager,
Optional<FunctionAuthProvider> functionAuthProvider,
Optional<RuntimeCustomizer> runtimeCustomizer) {
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
index 04b871c..aaa0004 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
import
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import java.nio.file.Paths;
@@ -94,6 +95,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
@Override
public void initialize(WorkerConfig workerConfig, AuthenticationConfig
authenticationConfig,
SecretsProviderConfigurator
secretsProviderConfigurator,
+ ConnectorsManager connectorsManager,
Optional<FunctionAuthProvider> authProvider,
Optional<RuntimeCustomizer> runtimeCustomizer) {
ProcessRuntimeFactoryConfig factoryConfig =
RuntimeUtils.getRuntimeFunctionConfig(
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index f8b0a34..a9614a1 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -19,7 +19,12 @@
package org.apache.pulsar.functions.runtime.thread;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import io.prometheus.client.CollectorRegistry;
@@ -28,6 +33,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -36,6 +42,8 @@ import
org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
+import org.apache.pulsar.functions.utils.io.Connector;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
/**
* A function container implemented using java thread.
@@ -60,6 +68,8 @@ public class ThreadRuntime implements Runtime {
private SecretsProvider secretsProvider;
private CollectorRegistry collectorRegistry;
private String narExtractionDirectory;
+ private final Optional<ConnectorsManager> connectorsManager;
+
ThreadRuntime(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
ThreadGroup threadGroup,
@@ -69,7 +79,8 @@ public class ThreadRuntime implements Runtime {
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
CollectorRegistry collectorRegistry,
- String narExtractionDirectory) {
+ String narExtractionDirectory,
+ Optional<ConnectorsManager> connectorsManager) {
this.instanceConfig = instanceConfig;
if (instanceConfig.getFunctionDetails().getRuntime() !=
Function.FunctionDetails.Runtime.JAVA) {
throw new RuntimeException("Thread Container only supports Java
Runtime");
@@ -84,34 +95,82 @@ public class ThreadRuntime implements Runtime {
this.secretsProvider = secretsProvider;
this.collectorRegistry = collectorRegistry;
this.narExtractionDirectory = narExtractionDirectory;
- this.javaInstanceRunnable = new JavaInstanceRunnable(
- instanceConfig,
- fnCache,
- jarFile,
- pulsarClient,
- pulsarAdmin,
- stateStorageServiceUrl,
- secretsProvider,
- collectorRegistry,
- narExtractionDirectory);
+ this.connectorsManager = connectorsManager;
+ }
+
+ private static ClassLoader getFunctionClassLoader(InstanceConfig
instanceConfig,
+ String jarFile,
+ String
narExtractionDirectory,
+ FunctionCacheManager
fnCache,
+
Optional<ConnectorsManager> connectorsManager) throws Exception {
+
+ if
(FunctionCommon.isFunctionCodeBuiltin(instanceConfig.getFunctionDetails())
+ && connectorsManager.isPresent()) {
+ switch
(InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails())) {
+ case SOURCE:
+ return connectorsManager.get().getConnector(
+
instanceConfig.getFunctionDetails().getSource().getBuiltin()).getClassLoader();
+ case SINK:
+ return connectorsManager.get().getConnector(
+
instanceConfig.getFunctionDetails().getSink().getBuiltin()).getClassLoader();
+ default:
+ return loadJars(jarFile, instanceConfig,
narExtractionDirectory, fnCache);
+ }
+ } else {
+ return loadJars(jarFile, instanceConfig, narExtractionDirectory,
fnCache);
+ }
+ }
+
+ private static ClassLoader loadJars(String jarFile,
+ InstanceConfig instanceConfig,
+ String narExtractionDirectory,
+ FunctionCacheManager fnCache) throws
Exception {
+ ClassLoader fnClassLoader;
+ try {
+ log.info("Load JAR: {}", jarFile);
+ // Let's first try to treat it as a nar archive
+ fnCache.registerFunctionInstanceWithArchive(
+ instanceConfig.getFunctionId(),
+ instanceConfig.getInstanceName(),
+ jarFile, narExtractionDirectory);
+ } catch (FileNotFoundException e) {
+ // create the function class loader
+ fnCache.registerFunctionInstance(
+ instanceConfig.getFunctionId(),
+ instanceConfig.getInstanceName(),
+ Arrays.asList(jarFile),
+ Collections.emptyList());
+ }
+
+ log.info("Initialize function class loader for function {} at function
cache manager, functionClassLoader: {}",
+ instanceConfig.getFunctionDetails().getName(),
fnCache.getClassLoader(instanceConfig.getFunctionId()));
+
+ fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
+ if (null == fnClassLoader) {
+ throw new Exception("No function class loader available.");
+ }
+
+ return fnClassLoader;
}
/**
* The core logic that initialize the thread container and executes the
function.
*/
@Override
- public void start() {
+ public void start() throws Exception {
+
+ // extract class loader for function
+ ClassLoader functionClassLoader =
getFunctionClassLoader(instanceConfig, jarFile, narExtractionDirectory,
fnCache, connectorsManager);
+
// re-initialize JavaInstanceRunnable so that variables in constructor
can be re-initialized
this.javaInstanceRunnable = new JavaInstanceRunnable(
instanceConfig,
- fnCache,
- jarFile,
pulsarClient,
pulsarAdmin,
stateStorageServiceUrl,
secretsProvider,
collectorRegistry,
- narExtractionDirectory);
+ functionClassLoader);
log.info("ThreadContainer starting function with instance config {}",
instanceConfig);
this.fnThread = new Thread(threadGroup, javaInstanceRunnable,
String.format("%s-%s",
@@ -145,6 +204,12 @@ public class ThreadRuntime implements Runtime {
}
// make sure JavaInstanceRunnable is closed
this.javaInstanceRunnable.close();
+
+ log.info("Unloading JAR files for function {}", instanceConfig);
+ // once the thread quits, clean up the instance
+ fnCache.unregisterFunctionInstance(
+ instanceConfig.getFunctionId(),
+ instanceConfig.getInstanceName());
}
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index f5e3736..d44cf1b 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -41,6 +41,7 @@ import
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderCo
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import
org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import java.util.Optional;
@@ -64,6 +65,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
private volatile boolean closed;
private SecretsProviderConfigurator secretsProviderConfigurator;
private ClassLoader rootClassLoader;
+ private Optional<ConnectorsManager> connectorsManager;
/**
* This constructor is used by other runtimes (e.g. ProcessRuntime and
KubernetesRuntime) that rely on ThreadRuntime to actually run an instance of
the function.
@@ -76,13 +78,15 @@ public class ThreadRuntimeFactory implements RuntimeFactory
{
String pulsarWebServiceUrl) throws Exception {
initialize(threadGroupName, Optional.empty(), pulsarServiceUrl,
authConfig,
storageServiceUrl, null, secretsProvider, collectorRegistry,
narExtractionDirectory,
- rootClassLoader, exposePulsarAdminClientEnabled,
pulsarWebServiceUrl);
+ rootClassLoader, exposePulsarAdminClientEnabled,
pulsarWebServiceUrl, Optional.empty());
}
private void initialize(String threadGroupName,
Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit, String
pulsarServiceUrl, AuthenticationConfig authConfig, String storageServiceUrl,
SecretsProviderConfigurator
secretsProviderConfigurator, SecretsProvider secretsProvider,
CollectorRegistry collectorRegistry, String
narExtractionDirectory,
- ClassLoader rootClassLoader, boolean
exposePulsarAdminClientEnabled, String pulsarWebServiceUrl) throws
PulsarClientException {
+ ClassLoader rootClassLoader, boolean
exposePulsarAdminClientEnabled,
+ String pulsarWebServiceUrl,
Optional<ConnectorsManager> connectorsManager) throws PulsarClientException {
+
if (rootClassLoader == null) {
rootClassLoader = Thread.currentThread().getContextClassLoader();
}
@@ -97,6 +101,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
this.storageServiceUrl = storageServiceUrl;
this.collectorRegistry = collectorRegistry;
this.narExtractionDirectory = narExtractionDirectory;
+ this.connectorsManager = connectorsManager;
}
private Optional<Long>
calculateClientMemoryLimit(Optional<ThreadRuntimeFactoryConfig.MemoryLimit>
memoryLimit) {
@@ -134,6 +139,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory
{
@Override
public void initialize(WorkerConfig workerConfig, AuthenticationConfig
authenticationConfig,
SecretsProviderConfigurator
secretsProviderConfigurator,
+ ConnectorsManager connectorsManager,
Optional<FunctionAuthProvider> functionAuthProvider,
Optional<RuntimeCustomizer> runtimeCustomizer)
throws Exception {
ThreadRuntimeFactoryConfig factoryConfig =
RuntimeUtils.getRuntimeFunctionConfig(
@@ -143,7 +149,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory
{
workerConfig.getPulsarServiceUrl(), authenticationConfig,
workerConfig.getStateStorageServiceUrl(),
secretsProviderConfigurator, null,
null, workerConfig.getNarExtractionDirectory(), null,
- workerConfig.isExposeAdminClientEnabled(),
workerConfig.getPulsarWebServiceUrl());
+ workerConfig.isExposeAdminClientEnabled(),
workerConfig.getPulsarWebServiceUrl(), Optional.of(connectorsManager));
}
@Override
@@ -169,7 +175,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory
{
storageServiceUrl,
secretsProvider,
collectorRegistry,
- narExtractionDirectory);
+ narExtractionDirectory,
+ connectorsManager);
}
@Override
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
similarity index 96%
rename from
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
rename to
pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
index 856c8d6..0afffa5 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.worker;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
@@ -33,6 +34,7 @@ import java.util.stream.Collectors;
@Slf4j
public class ConnectorsManager {
+ @Getter
private volatile TreeMap<String, Connector> connectors;
public ConnectorsManager(WorkerConfig workerConfig) throws IOException {
@@ -47,7 +49,7 @@ public class ConnectorsManager {
return connectors.get(connectorType).getConnectorDefinition();
}
- public List<ConnectorDefinition> getConnectors() {
+ public List<ConnectorDefinition> getConnectorDefinitions() {
return connectors.values().stream().map(connector ->
connector.getConnectorDefinition()).collect(Collectors.toList());
}
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
index dec1014..9870c3f 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import
org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.mockito.Mockito;
import org.testng.annotations.AfterMethod;
@@ -181,7 +182,7 @@ public class KubernetesRuntimeFactoryTest {
workerConfig.setStateStorageServiceUrl(null);
workerConfig.setAuthenticationEnabled(false);
- factory.initialize(workerConfig,null, new
TestSecretProviderConfigurator(), functionAuthProvider, manifestCustomizer);
+ factory.initialize(workerConfig,null, new
TestSecretProviderConfigurator(), Mockito.mock(ConnectorsManager.class),
functionAuthProvider, manifestCustomizer);
return factory;
}
@@ -383,7 +384,7 @@ public class KubernetesRuntimeFactoryTest {
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(kubernetesRuntimeFactoryConfig,
Map.class));
AuthenticationConfig authenticationConfig =
AuthenticationConfig.builder().build();
- kubernetesRuntimeFactory.initialize(workerConfig,
authenticationConfig, new DefaultSecretsProviderConfigurator(),
Optional.empty(), Optional.empty());
+ kubernetesRuntimeFactory.initialize(workerConfig,
authenticationConfig, new DefaultSecretsProviderConfigurator(),
Mockito.mock(ConnectorsManager.class), Optional.empty(), Optional.empty());
return kubernetesRuntimeFactory;
}
}
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 5c1c901..54c72cb 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -39,7 +39,9 @@ import
org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
@@ -219,7 +221,7 @@ public class KubernetesRuntimeTest {
manifestCustomizer.ifPresent(runtimeCustomizer ->
runtimeCustomizer.initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap())));
- factory.initialize(workerConfig, null, new
TestSecretProviderConfigurator(), Optional.empty(), manifestCustomizer);
+ factory.initialize(workerConfig, null, new
TestSecretProviderConfigurator(), Mockito.mock(ConnectorsManager.class),
Optional.empty(), manifestCustomizer);
return factory;
}
@@ -910,7 +912,8 @@ public class KubernetesRuntimeTest {
manifestCustomizer.get().initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap()));
}
- factory.initialize(workerConfig, null, new
TestSecretProviderConfigurator(), Optional.empty(), manifestCustomizer);
+ factory.initialize(workerConfig, null, new
TestSecretProviderConfigurator(),
+ Mockito.mock(ConnectorsManager.class), Optional.empty(),
manifestCustomizer);
return factory;
}
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
index 66090f5..69336fa 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
@@ -49,7 +49,9 @@ import
org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.mockito.Mockito;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -170,7 +172,7 @@ public class ProcessRuntimeTest {
workerConfig.setFunctionRuntimeFactoryClassName(ProcessRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(processRuntimeFactoryConfig,
Map.class));
- processRuntimeFactory.initialize(workerConfig, null, new
TestSecretsProviderConfigurator(), Optional.empty(), Optional.empty());
+ processRuntimeFactory.initialize(workerConfig, null, new
TestSecretsProviderConfigurator(), Mockito.mock(ConnectorsManager.class),
Optional.empty(), Optional.empty());
return processRuntimeFactory;
}
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
index d0e2596..8346e9d 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.instance.InstanceUtils;
import
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
@@ -145,6 +146,7 @@ public class ThreadRuntimeFactoryTest {
workerConfig,
Mockito.mock(AuthenticationConfig.class),
Mockito.mock(SecretsProviderConfigurator.class),
+ Mockito.mock(ConnectorsManager.class),
Optional.empty(), Optional.empty());
return clientBuilder;
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 41dbcbd..7858663 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -483,4 +483,26 @@ public class FunctionCommon {
public static String capFirstLetter(Enum en) {
return StringUtils.capitalize(en.toString().toLowerCase());
}
+
+ public static boolean
isFunctionCodeBuiltin(org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder
functionDetails) {
+ if (functionDetails.hasSource()) {
+ org.apache.pulsar.functions.proto.Function.SourceSpec sourceSpec =
functionDetails.getSource();
+ if (!isEmpty(sourceSpec.getBuiltin())) {
+ return true;
+ }
+ }
+
+ if (functionDetails.hasSink()) {
+ org.apache.pulsar.functions.proto.Function.SinkSpec sinkSpec =
functionDetails.getSink();
+ if (!isEmpty(sinkSpec.getBuiltin())) {
+ return true;
+ }
+ }
+
+ if (!isEmpty(functionDetails.getBuiltin())) {
+ return true;
+ }
+
+ return false;
+ }
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index e051f12..73f8f95 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -31,7 +31,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -48,7 +47,6 @@ import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.io.Connector;
-import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import java.io.File;
import java.io.FileNotFoundException;
@@ -120,7 +118,7 @@ public class FunctionActioner {
URL url = new URL(pkgLocation);
File pkgFile = new File(url.toURI());
packageFile = pkgFile.getAbsolutePath();
- } else if (WorkerUtils.isFunctionCodeBuiltin(functionDetails))
{
+ } else if
(FunctionCommon.isFunctionCodeBuiltin(functionDetails)) {
File pkgFile =
getBuiltinArchive(FunctionDetails.newBuilder(functionMetaData.getFunctionDetails()));
packageFile = pkgFile.getAbsolutePath();
} else {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 67b72be..c95475a 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -206,7 +206,9 @@ public class FunctionRuntimeManager implements
AutoCloseable{
}
}
// initialize runtime
- this.runtimeFactory.initialize(workerConfig, authConfig,
secretsProviderConfigurator, functionAuthProvider, runtimeCustomizer);
+ this.runtimeFactory.initialize(workerConfig, authConfig,
+ secretsProviderConfigurator, connectorsManager,
+ functionAuthProvider, runtimeCustomizer);
this.functionActioner = new FunctionActioner(this.workerConfig,
runtimeFactory,
dlogNamespace, connectorsManager, functionsManager,
workerService.getBrokerAdmin());
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 5e65990..821a8fc 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -314,28 +314,6 @@ public final class WorkerUtils {
}
}
- public static boolean
isFunctionCodeBuiltin(Function.FunctionDetailsOrBuilder functionDetails) {
- if (functionDetails.hasSource()) {
- Function.SourceSpec sourceSpec = functionDetails.getSource();
- if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
- return true;
- }
- }
-
- if (functionDetails.hasSink()) {
- Function.SinkSpec sinkSpec = functionDetails.getSink();
- if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
- return true;
- }
- }
-
- if (!StringUtils.isEmpty(functionDetails.getBuiltin())) {
- return true;
- }
-
- return false;
- }
-
public static Reader<byte[]> createReader(ReaderBuilder readerBuilder,
String readerName,
String topic,
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 62747bf..8699ecf 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -26,7 +26,7 @@ import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static
org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace;
import static
org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
-import static
org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
+import static
org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
import static
org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
import io.netty.buffer.ByteBuf;
@@ -59,10 +59,8 @@ import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -78,7 +76,6 @@ import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
-import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -101,7 +98,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@@ -900,7 +896,7 @@ public abstract class ComponentImpl implements
Component<PulsarWorkerService> {
throwUnavailableException();
}
- return this.worker().getConnectorsManager().getConnectors();
+ return this.worker().getConnectorsManager().getConnectorDefinitions();
}
@Override
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 72253cb..b933c0d 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -64,7 +64,7 @@ import java.util.function.Supplier;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static
org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
-import static
org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
+import static
org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
import static
org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
@Slf4j
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index a7909d7..c059e18 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -53,14 +53,13 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.*;
import java.util.function.Supplier;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static
org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
-import static
org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
+import static
org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
import static
org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
@Slf4j
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 15b5c9e..089b67c 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -59,7 +59,7 @@ import java.util.function.Supplier;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static
org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
-import static
org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
+import static
org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
import static
org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
@Slf4j
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index a67d179..7d24d8c 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -208,7 +208,7 @@ public class WorkerImpl implements
Workers<PulsarWorkerService> {
throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
}
- return this.worker().getConnectorsManager().getConnectors();
+ return this.worker().getConnectorsManager().getConnectorDefinitions();
}
public void rebalance(final URI uri, final String clientRole) {
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index facae5a..4214364 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -699,6 +699,7 @@ public class FunctionRuntimeManagerTest {
any(AuthenticationConfig.class),
any(SecretsProviderConfigurator.class),
any(),
+ any(),
any()
);
doNothing().when(kubernetesRuntimeFactory).setupClient();
@@ -944,6 +945,7 @@ public class FunctionRuntimeManagerTest {
any(AuthenticationConfig.class),
any(SecretsProviderConfigurator.class),
any(),
+ any(),
any()
);
doNothing().when(mockedKubernetesRuntimeFactory).setupClient();
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 8e026d9..41755f7 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -163,7 +163,7 @@ public class FunctionsImplTest {
instanceConfig.setMaxBufferedTuples(1024);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
- instanceConfig, null, null, null, null, null, null, null,
null);
+ instanceConfig, null, null, null, null, null, null);
CompletableFuture<InstanceCommunication.MetricsData>
metricsDataCompletableFuture = new
CompletableFuture<InstanceCommunication.MetricsData>();
metricsDataCompletableFuture.complete(javaInstanceRunnable.getMetrics());
Runtime runtime = mock(Runtime.class);
@@ -209,7 +209,7 @@ public class FunctionsImplTest {
instanceConfig.setMaxBufferedTuples(1024);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
- instanceConfig, null, null, null, null, null, null, null,
null);
+ instanceConfig, null, null, null, null, null, null);
CompletableFuture<InstanceCommunication.MetricsData> completableFuture
= new CompletableFuture<InstanceCommunication.MetricsData>();
completableFuture.complete(javaInstanceRunnable.getMetrics());
Runtime runtime = mock(Runtime.class);