This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new bc34b27e1be [fix][fn] Correct TLS cert config translation from broker
to fn worker (#20297)
bc34b27e1be is described below
commit bc34b27e1beaa57085662b61a640ac19999d5a79
Author: Michael Marshall <[email protected]>
AuthorDate: Thu May 11 17:29:22 2023 -0500
[fix][fn] Correct TLS cert config translation from broker to fn worker
(#20297)
### Motivation
The `initializeWorkerConfigFromBrokerConfig` method converts a
`ServiceConfiguration` object to a `WorkerConfig`. This is used when the
function worker is running within the broker and when pulsar is running in
standalone mode. The TLS certificates that are trusted by the broker should
also be trusted by the function worker, and the TLS certificates that are
trusted by the broker client should be trusted by the function worker's client.
### Modifications
* Improve the `PulsarFunctionTlsTest` test by adding awaitility. Before
this change, the test was flaky on my machine.
* Fix the mapping of broker config to function worker config.
### Verifying this change
A test is added. Note that the old test doesn't technically fail due to the
misconfiguration at the moment. The error is in the logs. Here is one of the
stack traces that is gone after this change:
```
2023-05-10T23:19:27,087 - WARN - [pulsar-client-io-253-3:ClientCnx@344] -
[localhost/127.0.0.1:59715] Got exception
io.netty.handler.codec.DecoderException: javax.net.ssl.SSLHandshakeException:
General OpenSslEngine problem
at
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:499)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: javax.net.ssl.SSLHandshakeException: General OpenSslEngine
problem
at
io.netty.handler.ssl.ReferenceCountedOpenSslEngine.handshakeException(ReferenceCountedOpenSslEngine.java:1907)
at
io.netty.handler.ssl.ReferenceCountedOpenSslEngine.wrap(ReferenceCountedOpenSslEngine.java:834)
at java.base/javax.net.ssl.SSLEngine.wrap(SSLEngine.java:564)
at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:1042)
at
io.netty.handler.ssl.SslHandler.wrapNonAppData(SslHandler.java:928)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1418)
at
io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1256)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1296)
at
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
at
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
... 17 more
Caused by: sun.security.validator.ValidatorException: PKIX path building
failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to
find valid certification path to requested target
at
java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439)
at
java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306)
at
java.base/sun.security.validator.Validator.validate(Validator.java:264)
at
java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:285)
at
java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144)
at
io.netty.handler.ssl.ReferenceCountedOpenSslClientContext$ExtendedTrustManagerVerifyCallback.verify(ReferenceCountedOpenSslClientContext.java:234)
at
io.netty.handler.ssl.ReferenceCountedOpenSslContext$AbstractCertificateVerifier.verify(ReferenceCountedOpenSslContext.java:779)
at
io.netty.internal.tcnative.CertificateVerifierTask.runTask(CertificateVerifierTask.java:36)
at io.netty.internal.tcnative.SSLTask.run(SSLTask.java:48)
at io.netty.internal.tcnative.SSLTask.run(SSLTask.java:42)
at
io.netty.handler.ssl.ReferenceCountedOpenSslEngine.runAndResetNeedTask(ReferenceCountedOpenSslEngine.java:1496)
at
io.netty.handler.ssl.ReferenceCountedOpenSslEngine.access$700(ReferenceCountedOpenSslEngine.java:94)
at
io.netty.handler.ssl.ReferenceCountedOpenSslEngine$TaskDecorator.run(ReferenceCountedOpenSslEngine.java:1471)
at
io.netty.handler.ssl.SslHandler.runDelegatedTasks(SslHandler.java:1558)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1404)
... 21 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException:
unable to find valid certification path to requested target
at
java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
at
java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
at
java.base/java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297)
at
java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434)
... 35 more
```
### Does this pull request potentially affect one of the following parts:
This affects the configuration, but I think it should be classified as
fixing a bug, so it is not a breaking change.
### Documentation
- [x] `doc-not-needed`
### Matching PR in forked repository
PR in forked repository: the modified test passes locally, so skipping
forked test.
(cherry picked from commit 525dd2f3b541d8bb8fe166b28b79597aae45ef4f)
---
.../org/apache/pulsar/broker/PulsarService.java | 3 ++-
.../functions/worker/PulsarFunctionTlsTest.java | 23 +++++++++++++++++-----
2 files changed, 20 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5fc9920d0f2..f833674ceeb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1859,7 +1859,8 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
workerConfig.setTlsAllowInsecureConnection(brokerConfig.isTlsAllowInsecureConnection());
workerConfig.setTlsEnabled(brokerConfig.isTlsEnabled());
workerConfig.setTlsEnableHostnameVerification(brokerConfig.isTlsHostnameVerificationEnabled());
-
workerConfig.setBrokerClientTrustCertsFilePath(brokerConfig.getTlsTrustCertsFilePath());
+
workerConfig.setBrokerClientTrustCertsFilePath(brokerConfig.getBrokerClientTrustCertsFilePath());
+
workerConfig.setTlsTrustCertsFilePath(brokerConfig.getTlsTrustCertsFilePath());
// client in worker will use this config to authenticate with broker
workerConfig.setBrokerClientAuthenticationPlugin(brokerConfig.getBrokerClientAuthenticationPlugin());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 00db6a65b16..246d980d617 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -51,6 +51,7 @@ import
org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -65,6 +66,7 @@ public class PulsarFunctionTlsTest {
private static final String TLS_SERVER_KEY_FILE_PATH =
"./src/test/resources/authentication/tls/broker-key.pem";
private static final String TLS_CLIENT_CERT_FILE_PATH =
"./src/test/resources/authentication/tls/client-cert.pem";
private static final String TLS_CLIENT_KEY_FILE_PATH =
"./src/test/resources/authentication/tls/client-key.pem";
+ private static final String CA_CERT_FILE_PATH =
"./src/test/resources/authentication/tls/cacert.pem";
LocalBookkeeperEnsemble bkEnsemble;
protected PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT];
@@ -111,9 +113,9 @@ public class PulsarFunctionTlsTest {
config.setAuthenticationProviders(providers);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
- config.setTlsAllowInsecureConnection(true);
+ config.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
config.setBrokerClientTlsEnabled(true);
-
config.setBrokerClientTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
+ config.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
config.setBrokerClientAuthenticationParameters(
"tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + ",tlsKeyFile:" +
TLS_CLIENT_KEY_FILE_PATH);
@@ -141,7 +143,6 @@ public class PulsarFunctionTlsTest {
workerConfig.setUseTls(true);
workerConfig.setTlsEnableHostnameVerification(true);
workerConfig.setTlsAllowInsecureConnection(false);
-
workerConfig.setBrokerClientTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
fnWorkerServices[i] = WorkerServiceLoader.load(workerConfig);
configurations[i] = config;
@@ -163,8 +164,7 @@ public class PulsarFunctionTlsTest {
pulsarAdmins[i] = PulsarAdmin.builder()
.serviceHttpUrl(pulsarServices[i].getWebServiceAddressTls())
- .tlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH)
- .allowTlsInsecureConnection(true)
+ .tlsTrustCertsFilePath(CA_CERT_FILE_PATH)
.authentication(authTls)
.build();
}
@@ -216,6 +216,13 @@ public class PulsarFunctionTlsTest {
}
}
+ @Test
+ public void testTLSTrustCertsConfigMapping() throws Exception {
+ WorkerConfig workerConfig = fnWorkerServices[0].getWorkerConfig();
+ assertEquals(workerConfig.getTlsTrustCertsFilePath(),
CA_CERT_FILE_PATH);
+ assertEquals(workerConfig.getBrokerClientTrustCertsFilePath(),
CA_CERT_FILE_PATH);
+ }
+
@Test
public void testFunctionsCreation() throws Exception {
@@ -233,6 +240,12 @@ public class PulsarFunctionTlsTest {
functionConfig, jarFilePathUrl
);
+ // Function creation is not strongly consistent, so this test can
fail with a get that is too eager and
+ // does not have retries.
+ final PulsarAdmin admin = pulsarAdmins[i];
+ Awaitility.await().ignoreExceptions()
+ .untilAsserted(() ->
admin.functions().getFunction(testTenant, "my-ns", functionName));
+
FunctionConfig config =
pulsarAdmins[i].functions().getFunction(testTenant, "my-ns", functionName);
assertEquals(config.getTenant(), testTenant);
assertEquals(config.getNamespace(), "my-ns");