This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d23b7604d5a [fix][functions] Fix K8S download function method with
auth enabled (#17597)
d23b7604d5a is described below
commit d23b7604d5ad919e93f73c044de04f6c58b702ae
Author: Nicolò Boschi <[email protected]>
AuthorDate: Tue Sep 13 09:04:30 2022 +0200
[fix][functions] Fix K8S download function method with auth enabled (#17597)
---
.../runtime/kubernetes/KubernetesRuntime.java | 24 +++----
.../runtime/kubernetes/KubernetesRuntimeTest.java | 74 ++++++++++++++++++++--
2 files changed, 83 insertions(+), 15 deletions(-)
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index f7791e716d9..e6e85d66d0e 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -875,17 +875,7 @@ public class KubernetesRuntime implements Runtime {
ArrayList<String> cmd = new ArrayList<>(Arrays.asList(
pulsarRootDir + configAdminCLI,
"--admin-url",
- pulsarAdminUrl,
- "functions",
- "download",
- "--tenant",
- tenant,
- "--namespace",
- namespace,
- "--name",
- name,
- "--destination-file",
- userCodeFilePath));
+ pulsarAdminUrl));
// add auth plugin and parameters if necessary
if (authenticationEnabled && authConfig != null) {
@@ -900,6 +890,18 @@ public class KubernetesRuntime implements Runtime {
}
}
+ cmd.addAll(Arrays.asList(
+ "functions",
+ "download",
+ "--tenant",
+ tenant,
+ "--namespace",
+ namespace,
+ "--name",
+ name,
+ "--destination-file",
+ userCodeFilePath));
+
if (transformFunction) {
cmd.add("--transform-function");
}
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 33451a316e4..cf86623eafd 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -64,6 +64,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import static
org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH;
@@ -183,10 +184,21 @@ public class KubernetesRuntimeTest {
}
}
+
KubernetesRuntimeFactory createKubernetesRuntimeFactory(String
extraDepsDir, int percentMemoryPadding,
double
cpuOverCommitRatio, double memoryOverCommitRatio,
Optional<RuntimeCustomizer> manifestCustomizer,
String
downloadDirectory) throws Exception {
+ return createKubernetesRuntimeFactory(extraDepsDir,
percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio,
+ manifestCustomizer, downloadDirectory, null, null);
+ }
+
+ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String
extraDepsDir, int percentMemoryPadding,
+ double
cpuOverCommitRatio, double memoryOverCommitRatio,
+
Optional<RuntimeCustomizer> manifestCustomizer,
+ String
downloadDirectory,
+
Consumer<WorkerConfig> workerConfigConsumer,
+
AuthenticationConfig authenticationConfig) throws Exception {
KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory());
doNothing().when(factory).setupClient();
@@ -226,7 +238,11 @@ public class KubernetesRuntimeTest {
manifestCustomizer.ifPresent(runtimeCustomizer ->
runtimeCustomizer.initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap())));
- factory.initialize(workerConfig, null, new
TestSecretProviderConfigurator(), Mockito.mock(ConnectorsManager.class),
+ if (workerConfigConsumer != null) {
+ workerConfigConsumer.accept(workerConfig);
+ }
+
+ factory.initialize(workerConfig, authenticationConfig, new
TestSecretProviderConfigurator(), Mockito.mock(ConnectorsManager.class),
Mockito.mock(FunctionsManager.class), Optional.empty(),
manifestCustomizer);
return factory;
}
@@ -234,7 +250,16 @@ public class KubernetesRuntimeTest {
KubernetesRuntimeFactory createKubernetesRuntimeFactory(String
extraDepsDir, int percentMemoryPadding,
double
cpuOverCommitRatio, double memoryOverCommitRatio,
Optional<RuntimeCustomizer> manifestCustomizer) throws Exception {
- return createKubernetesRuntimeFactory(extraDepsDir,
percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio,
manifestCustomizer, null);
+ return createKubernetesRuntimeFactory(extraDepsDir,
percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio,
manifestCustomizer,
+ null, null, null);
+ }
+
+ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String
extraDepsDir, int percentMemoryPadding,
+ double
cpuOverCommitRatio, double memoryOverCommitRatio,
+
Optional<RuntimeCustomizer> manifestCustomizer,
+
Consumer<WorkerConfig> workerConfigConsumer) throws Exception {
+ return createKubernetesRuntimeFactory(extraDepsDir,
percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio,
manifestCustomizer,
+ null, workerConfigConsumer, null);
}
KubernetesRuntimeFactory createKubernetesRuntimeFactory(String
extraDepsDir, int percentMemoryPadding,
@@ -810,6 +835,33 @@ public class KubernetesRuntimeTest {
assertEquals(spec.getSpec().getTemplate().getSpec().getServiceAccountName(),
"my-service-account");
}
+ @Test
+ public void testCustomKubernetesDownloadCommandsWithAuth() throws
Exception {
+ InstanceConfig config =
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
+
config.setFunctionAuthenticationSpec(Function.FunctionAuthenticationSpec.newBuilder().build());
+
config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA,
false));
+
+ factory = createKubernetesRuntimeFactory(null,
+ 10, 1.0, 1.0, Optional.empty(), null, wconfig -> {
+ wconfig.setAuthenticationEnabled(true);
+ }, AuthenticationConfig.builder()
+ .clientAuthenticationPlugin("com.MyAuth")
+ .clientAuthenticationParameters("{\"authParam1\":
\"authParamValue1\"}")
+ .build());
+
+ KubernetesRuntime container = factory.createContainer(config,
userJarFile, userJarFile, null, null, 30l);
+ V1StatefulSet spec = container.createStatefulSet();
+ String expectedDownloadCommand = "pulsar-admin --admin-url " +
pulsarAdminUrl
+ + " --auth-plugin com.MyAuth --auth-params {\"authParam1\":
\"authParamValue1\"}"
+ + " functions download "
+ + "--tenant " + TEST_TENANT
+ + " --namespace " + TEST_NAMESPACE
+ + " --name " + TEST_NAME
+ + " --destination-file " + pulsarRootDir + "/" + userJarFile;
+ String containerCommand =
spec.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand().get(2);
+ assertTrue(containerCommand.contains(expectedDownloadCommand),
"Found:" + containerCommand);
+ }
+
InstanceConfig createGolangInstanceConfig() {
InstanceConfig config = new InstanceConfig();
@@ -915,6 +967,16 @@ public class KubernetesRuntimeTest {
double
cpuOverCommitRatio, double memoryOverCommitRatio,
String
manifestCustomizerClassName,
Map<String,
Object> runtimeCustomizerConfig) throws Exception {
+ return createKubernetesRuntimeFactory(extraDepsDir,
percentMemoryPadding, cpuOverCommitRatio,
+ memoryOverCommitRatio, manifestCustomizerClassName,
runtimeCustomizerConfig, null, null);
+ }
+
+ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String
extraDepsDir, int percentMemoryPadding,
+ double
cpuOverCommitRatio, double memoryOverCommitRatio,
+ String
manifestCustomizerClassName,
+ Map<String,
Object> runtimeCustomizerConfig,
+
Consumer<WorkerConfig> workerConfigConsumer,
+
AuthenticationConfig authenticationConfig) throws Exception {
KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory());
doNothing().when(factory).setupClient();
@@ -957,8 +1019,11 @@ public class KubernetesRuntimeTest {
manifestCustomizer =
Optional.of(RuntimeCustomizer.getRuntimeCustomizer(workerConfig.getRuntimeCustomizerClassName()));
manifestCustomizer.get().initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap()));
}
+ if (workerConfigConsumer != null) {
+ workerConfigConsumer.accept(workerConfig);
+ }
- factory.initialize(workerConfig, null, new
TestSecretProviderConfigurator(),
+ factory.initialize(workerConfig, authenticationConfig, new
TestSecretProviderConfigurator(),
Mockito.mock(ConnectorsManager.class),
Mockito.mock(FunctionsManager.class), Optional.empty(), manifestCustomizer);
return factory;
}
@@ -1094,7 +1159,8 @@ public class KubernetesRuntimeTest {
public void testJavaConstructorWithoutDownloadDirectoryDefined() throws
Exception {
InstanceConfig config =
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
- factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0,
Optional.empty(), null);
+ factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0,
+ Optional.empty());
verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false,
factory.getDownloadDirectory());
}