This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new bc34b27e1be [fix][fn] Correct TLS cert config translation from broker 
to fn worker (#20297)
bc34b27e1be is described below

commit bc34b27e1beaa57085662b61a640ac19999d5a79
Author: Michael Marshall <[email protected]>
AuthorDate: Thu May 11 17:29:22 2023 -0500

    [fix][fn] Correct TLS cert config translation from broker to fn worker 
(#20297)
    
    ### Motivation
    
    The `initializeWorkerConfigFromBrokerConfig` method converts a 
`ServiceConfiguration` object to a `WorkerConfig`. This is used when the 
function worker is running within the broker and when pulsar is running in 
standalone mode. The TLS certificates that are trusted by the broker should 
also be trusted by the function worker, and the TLS certificates that are 
trusted by the broker client should be trusted by the function worker's client.
    
    ### Modifications
    
    * Improve the `PulsarFunctionTlsTest` test by adding awaitility. Before 
this change, the test was flaky on my machine.
    * Fix the mapping of broker config to function worker config.
    
    ### Verifying this change
    
    A test is added. Note that the old test doesn't technically fail due to the 
misconfiguration at the moment. The error is in the logs. Here is one of the 
stack traces that is gone after this change:
    
    ```
    2023-05-10T23:19:27,087 - WARN  - [pulsar-client-io-253-3:ClientCnx@344] - 
[localhost/127.0.0.1:59715] Got exception 
io.netty.handler.codec.DecoderException: javax.net.ssl.SSLHandshakeException: 
General OpenSslEngine problem
            at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:499)
            at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
            at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
            at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
            at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
            at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
            at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
            at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
            at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.base/java.lang.Thread.run(Thread.java:833)
    Caused by: javax.net.ssl.SSLHandshakeException: General OpenSslEngine 
problem
            at 
io.netty.handler.ssl.ReferenceCountedOpenSslEngine.handshakeException(ReferenceCountedOpenSslEngine.java:1907)
            at 
io.netty.handler.ssl.ReferenceCountedOpenSslEngine.wrap(ReferenceCountedOpenSslEngine.java:834)
            at java.base/javax.net.ssl.SSLEngine.wrap(SSLEngine.java:564)
            at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:1042)
            at 
io.netty.handler.ssl.SslHandler.wrapNonAppData(SslHandler.java:928)
            at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1418)
            at 
io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1256)
            at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1296)
            at 
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
            at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
            ... 17 more
    Caused by: sun.security.validator.ValidatorException: PKIX path building 
failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to 
find valid certification path to requested target
            at 
java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439)
            at 
java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306)
            at 
java.base/sun.security.validator.Validator.validate(Validator.java:264)
            at 
java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:285)
            at 
java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144)
            at 
io.netty.handler.ssl.ReferenceCountedOpenSslClientContext$ExtendedTrustManagerVerifyCallback.verify(ReferenceCountedOpenSslClientContext.java:234)
            at 
io.netty.handler.ssl.ReferenceCountedOpenSslContext$AbstractCertificateVerifier.verify(ReferenceCountedOpenSslContext.java:779)
            at 
io.netty.internal.tcnative.CertificateVerifierTask.runTask(CertificateVerifierTask.java:36)
            at io.netty.internal.tcnative.SSLTask.run(SSLTask.java:48)
            at io.netty.internal.tcnative.SSLTask.run(SSLTask.java:42)
            at 
io.netty.handler.ssl.ReferenceCountedOpenSslEngine.runAndResetNeedTask(ReferenceCountedOpenSslEngine.java:1496)
            at 
io.netty.handler.ssl.ReferenceCountedOpenSslEngine.access$700(ReferenceCountedOpenSslEngine.java:94)
            at 
io.netty.handler.ssl.ReferenceCountedOpenSslEngine$TaskDecorator.run(ReferenceCountedOpenSslEngine.java:1471)
            at 
io.netty.handler.ssl.SslHandler.runDelegatedTasks(SslHandler.java:1558)
            at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1404)
            ... 21 more
    Caused by: sun.security.provider.certpath.SunCertPathBuilderException: 
unable to find valid certification path to requested target
            at 
java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
            at 
java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
            at 
java.base/java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297)
            at 
java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434)
            ... 35 more
    ```
    
    ### Does this pull request potentially affect one of the following parts:
    
    This affects the configuration, but I think it should be classified as 
fixing a bug, so it is not a breaking change.
    
    ### Documentation
    
    - [x] `doc-not-needed`
    ### Matching PR in forked repository
    
    PR in forked repository: the modified test passes locally, so skipping 
forked test.
    
    (cherry picked from commit 525dd2f3b541d8bb8fe166b28b79597aae45ef4f)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  3 ++-
 .../functions/worker/PulsarFunctionTlsTest.java    | 23 +++++++++++++++++-----
 2 files changed, 20 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5fc9920d0f2..f833674ceeb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1859,7 +1859,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         
workerConfig.setTlsAllowInsecureConnection(brokerConfig.isTlsAllowInsecureConnection());
         workerConfig.setTlsEnabled(brokerConfig.isTlsEnabled());
         
workerConfig.setTlsEnableHostnameVerification(brokerConfig.isTlsHostnameVerificationEnabled());
-        
workerConfig.setBrokerClientTrustCertsFilePath(brokerConfig.getTlsTrustCertsFilePath());
+        
workerConfig.setBrokerClientTrustCertsFilePath(brokerConfig.getBrokerClientTrustCertsFilePath());
+        
workerConfig.setTlsTrustCertsFilePath(brokerConfig.getTlsTrustCertsFilePath());
 
         // client in worker will use this config to authenticate with broker
         
workerConfig.setBrokerClientAuthenticationPlugin(brokerConfig.getBrokerClientAuthenticationPlugin());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 00db6a65b16..246d980d617 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -51,6 +51,7 @@ import 
org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -65,6 +66,7 @@ public class PulsarFunctionTlsTest {
     private static final String TLS_SERVER_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/broker-key.pem";
     private static final String TLS_CLIENT_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/client-cert.pem";
     private static final String TLS_CLIENT_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/client-key.pem";
+    private static final String CA_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/cacert.pem";
 
     LocalBookkeeperEnsemble bkEnsemble;
     protected PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT];
@@ -111,9 +113,9 @@ public class PulsarFunctionTlsTest {
             config.setAuthenticationProviders(providers);
             config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
             config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-            config.setTlsAllowInsecureConnection(true);
+            config.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
             config.setBrokerClientTlsEnabled(true);
-            
config.setBrokerClientTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
+            config.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
             
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
             config.setBrokerClientAuthenticationParameters(
                 "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + ",tlsKeyFile:" + 
TLS_CLIENT_KEY_FILE_PATH);
@@ -141,7 +143,6 @@ public class PulsarFunctionTlsTest {
             workerConfig.setUseTls(true);
             workerConfig.setTlsEnableHostnameVerification(true);
             workerConfig.setTlsAllowInsecureConnection(false);
-            
workerConfig.setBrokerClientTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
             fnWorkerServices[i] = WorkerServiceLoader.load(workerConfig);
 
             configurations[i] = config;
@@ -163,8 +164,7 @@ public class PulsarFunctionTlsTest {
 
             pulsarAdmins[i] = PulsarAdmin.builder()
                 .serviceHttpUrl(pulsarServices[i].getWebServiceAddressTls())
-                .tlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH)
-                .allowTlsInsecureConnection(true)
+                .tlsTrustCertsFilePath(CA_CERT_FILE_PATH)
                 .authentication(authTls)
                 .build();
         }
@@ -216,6 +216,13 @@ public class PulsarFunctionTlsTest {
         }
     }
 
+    @Test
+    public void testTLSTrustCertsConfigMapping() throws Exception {
+        WorkerConfig workerConfig = fnWorkerServices[0].getWorkerConfig();
+        assertEquals(workerConfig.getTlsTrustCertsFilePath(), 
CA_CERT_FILE_PATH);
+        assertEquals(workerConfig.getBrokerClientTrustCertsFilePath(), 
CA_CERT_FILE_PATH);
+    }
+
     @Test
     public void testFunctionsCreation() throws Exception {
 
@@ -233,6 +240,12 @@ public class PulsarFunctionTlsTest {
                 functionConfig, jarFilePathUrl
             );
 
+            // Function creation is not strongly consistent, so this test can 
fail with a get that is too eager and
+            // does not have retries.
+            final PulsarAdmin admin = pulsarAdmins[i];
+            Awaitility.await().ignoreExceptions()
+                    .untilAsserted(() -> 
admin.functions().getFunction(testTenant, "my-ns", functionName));
+
             FunctionConfig config = 
pulsarAdmins[i].functions().getFunction(testTenant, "my-ns", functionName);
             assertEquals(config.getTenant(), testTenant);
             assertEquals(config.getNamespace(), "my-ns");

Reply via email to