This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 2b0659a5da1 [fix][fn] Configure pulsar admin for TLS (#20533)
2b0659a5da1 is described below
commit 2b0659a5da16d25c9f6dd035fd1bbf5d99474fc2
Author: Michael Marshall <[email protected]>
AuthorDate: Wed Jun 7 17:31:10 2023 -0500
[fix][fn] Configure pulsar admin for TLS (#20533)
### Motivation
This PR is a combination of https://github.com/apache/pulsar/pull/20482 and
https://github.com/apache/pulsar/pull/20513 because cherry picking those PRs
produced too many conflicts.
### Modifications
* Made the same addition as the source PRs, though the code has changed a
bit, so I had to make a few extra changes.
### Documentation
- [x] `doc-not-needed`
(cherry picked from commit 8c9c6e929287e33147c4d8d7217a0aeaef02373d)
---
.../runtime/kubernetes/KubernetesRuntime.java | 110 +++++++++------------
.../runtime/kubernetes/KubernetesRuntimeTest.java | 19 ++++
2 files changed, 67 insertions(+), 62 deletions(-)
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index c13a46d6b9b..d79502d6757 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -845,80 +845,66 @@ public class KubernetesRuntime implements Runtime {
}
private List<String> getDownloadCommand(String tenant, String namespace,
String name, String userCodeFilePath) {
+ List<String> result = new ArrayList<>();
+ result.add(pulsarRootDir + configAdminCLI);
// add auth plugin and parameters if necessary
if (authenticationEnabled && authConfig != null) {
- if (isNotBlank(authConfig.getClientAuthenticationPlugin())
- &&
isNotBlank(authConfig.getClientAuthenticationParameters())
- && instanceConfig.getFunctionAuthenticationSpec() != null)
{
- return Arrays.asList(
- pulsarRootDir + configAdminCLI,
- "--auth-plugin",
- authConfig.getClientAuthenticationPlugin(),
- "--auth-params",
- authConfig.getClientAuthenticationParameters(),
- "--admin-url",
- pulsarAdminUrl,
- "functions",
- "download",
- "--tenant",
- tenant,
- "--namespace",
- namespace,
- "--name",
- name,
- "--destination-file",
- userCodeFilePath);
- }
+ result.addAll(getAuthenticationParams(authConfig));
}
-
- return Arrays.asList(
- pulsarRootDir + configAdminCLI,
- "--admin-url",
- pulsarAdminUrl,
- "functions",
- "download",
- "--tenant",
- tenant,
- "--namespace",
- namespace,
- "--name",
- name,
- "--destination-file",
- userCodeFilePath);
+ result.add("--admin-url");
+ result.add(pulsarAdminUrl);
+ result.add("functions");
+ result.add("download");
+ result.add("--tenant");
+ result.add(tenant);
+ result.add("--namespace");
+ result.add(namespace);
+ result.add("--name");
+ result.add(name);
+ result.add("--destination-file");
+ result.add(userCodeFilePath);
+ return result;
}
private List<String> getPackageDownloadCommand(String packageName, String
userCodeFilePath) {
+ List<String> result = new ArrayList<>();
+ result.add(pulsarRootDir + configAdminCLI);
// add auth plugin and parameters if necessary
if (authenticationEnabled && authConfig != null) {
- if (isNotBlank(authConfig.getClientAuthenticationPlugin())
+ result.addAll(getAuthenticationParams(authConfig));
+ }
+ result.add("--admin-url");
+ result.add(pulsarAdminUrl);
+ result.add("packages");
+ result.add("download");
+ result.add(packageName);
+ result.add("--path");
+ result.add(userCodeFilePath);
+ return result;
+ }
+
+ private List<String> getAuthenticationParams(AuthenticationConfig
authConfig) {
+ List<String> result = new ArrayList<>();
+ if (isNotBlank(authConfig.getClientAuthenticationPlugin())
&& isNotBlank(authConfig.getClientAuthenticationParameters())
&& instanceConfig.getFunctionAuthenticationSpec() != null) {
- return Arrays.asList(
- pulsarRootDir + configAdminCLI,
- "--auth-plugin",
- authConfig.getClientAuthenticationPlugin(),
- "--auth-params",
- authConfig.getClientAuthenticationParameters(),
- "--admin-url",
- pulsarAdminUrl,
- "packages",
- "download",
- packageName,
- "--path",
- userCodeFilePath);
- }
+ result.add("--auth-plugin");
+ result.add(authConfig.getClientAuthenticationPlugin());
+ result.add("--auth-params");
+ result.add(authConfig.getClientAuthenticationParameters());
}
-
- return Arrays.asList(
- pulsarRootDir + configAdminCLI,
- "--admin-url",
- pulsarAdminUrl,
- "packages",
- "download",
- packageName,
- "--path",
- userCodeFilePath);
+ if (authConfig.isTlsAllowInsecureConnection()) {
+ result.add("--tls-allow-insecure");
+ }
+ if (authConfig.isTlsHostnameVerificationEnable()) {
+ result.add("--tls-enable-hostname-verification");
+ }
+ if (isNotBlank(authConfig.getTlsTrustCertsFilePath())) {
+ result.add("--tls-trust-cert-path");
+ result.add(authConfig.getTlsTrustCertsFilePath());
+ }
+ return result;
}
private static String setShardIdEnvironmentVariableCommand() {
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 14878e77b81..75bce227c3e 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -1149,6 +1149,25 @@ public class KubernetesRuntimeTest {
String containerCommand =
spec.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand().get(2);
assertTrue(containerCommand.contains(expectedDownloadCommand));
}
+ @Test
+ public void testCustomKubernetesDownloadWithTLSConfig() throws Exception {
+ String downloadDirectory = "download/pulsar_functions";
+ InstanceConfig config =
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
+
config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA,
false, (fb) -> {
+ return fb.setPackageUrl("function://public/default/test@v1");
+ }));
+
+ factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0,
Optional.empty(), downloadDirectory);
+ factory.setAuthenticationEnabled(true);
+
factory.setAuthConfig(AuthenticationConfig.builder().tlsHostnameVerificationEnable(true).build());
+
+ KubernetesRuntime container = factory.createContainer(config,
userJarFile, userJarFile, 30l);
+ V1StatefulSet spec = container.createStatefulSet();
+ String expectedDownloadCommand = "pulsar-admin
--tls-enable-hostname-verification --admin-url http://localhost:8080 packages
download "
+ + "function://public/default/test@v1 --path " +
factory.getDownloadDirectory() + "/" + userJarFile;
+ String containerCommand =
spec.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand().get(2);
+ assertTrue(containerCommand.contains(expectedDownloadCommand));
+ }
@Test
public void shouldUseConfiguredMetricsPort() throws Exception {