This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new c7514fe1067 [improve][cli] Using separate TLS config on the compactor 
(#17426)
c7514fe1067 is described below

commit c7514fe1067771786ec645e21492f40718047d96
Author: Zixuan Liu <[email protected]>
AuthorDate: Thu Sep 29 11:49:21 2022 +0800

    [improve][cli] Using separate TLS config on the compactor (#17426)
    
    Signed-off-by: Zixuan Liu <[email protected]>
    
    Signed-off-by: Zixuan Liu <[email protected]>
    
    ### Motivation
    
    Improve the compactor tool, using separate TLS config
    
    ### Modifications
    
    - Add separate TLS config on the compactor, both Keystore and PEM formats 
are supported
    - Fix correct use of service URL by 
`brokerConfig.isBrokerClientTlsEnabled()` value
    
    ### Verifying this change
    
    Test has been added.
    
    ### Documentation
    
    Check the box below or label this PR directly.
    
    Need to update docs?
    
    - [ ] `doc-required`
    (Your PR needs to update docs and you will update later)
    
    - [x] `doc-not-needed`
    (Please explain why)
    
    - [ ] `doc`
    (Your PR contains doc changes)
    
    - [ ] `doc-complete`
    (Docs have been already added)
    
    (cherry picked from commit 537aa545c70aa0b3023ff026c4a919546340656b)
---
 .../apache/pulsar/compaction/CompactorTool.java    | 73 ++++++++++++----------
 .../pulsar/compaction/CompactorToolTest.java       | 50 +++++++++++++++
 2 files changed, 91 insertions(+), 32 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index a54135de9a8..441d80c374b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -31,13 +31,13 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.pulsar.broker.BookKeeperClientFactory;
 import org.apache.pulsar.broker.BookKeeperClientFactoryImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
@@ -65,6 +65,45 @@ public class CompactorTool {
         private boolean generateDocs = false;
     }
 
+    public static PulsarClient createClient(ServiceConfiguration brokerConfig) 
throws PulsarClientException {
+        ClientBuilder clientBuilder = PulsarClient.builder()
+                .memoryLimit(0, SizeUnit.BYTES);
+
+        // Apply all arbitrary configuration. This must be called before 
setting any fields annotated as
+        // @Secret on the ClientConfigurationData object because of the way 
they are serialized.
+        // See https://github.com/apache/pulsar/issues/8509 for more 
information.
+        
clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(brokerConfig.getProperties(),
 "brokerClient_"));
+
+        if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) {
+            
clientBuilder.authentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
+                    brokerConfig.getBrokerClientAuthenticationParameters());
+        }
+
+        AdvertisedListener internalListener = 
ServiceConfigurationUtils.getInternalListener(brokerConfig, "pulsar+ssl");
+        if (internalListener.getBrokerServiceUrlTls() != null && 
brokerConfig.isBrokerClientTlsEnabled()) {
+            
clientBuilder.serviceUrl(internalListener.getBrokerServiceUrlTls().toString())
+                    
.allowTlsInsecureConnection(brokerConfig.isTlsAllowInsecureConnection());
+            if (brokerConfig.isBrokerClientTlsEnabledWithKeyStore()) {
+                clientBuilder.useKeyStoreTls(true)
+                        
.tlsKeyStoreType(brokerConfig.getBrokerClientTlsKeyStoreType())
+                        
.tlsKeyStorePath(brokerConfig.getBrokerClientTlsKeyStore())
+                        
.tlsKeyStorePassword(brokerConfig.getBrokerClientTlsKeyStorePassword())
+                        
.tlsTrustStoreType(brokerConfig.getBrokerClientTlsTrustStoreType())
+                        
.tlsTrustStorePath(brokerConfig.getBrokerClientTlsTrustStore())
+                        
.tlsTrustStorePassword(brokerConfig.getBrokerClientTlsTrustStorePassword());
+            } else {
+                
clientBuilder.tlsTrustCertsFilePath(brokerConfig.getBrokerClientTrustCertsFilePath())
+                        
.tlsKeyFilePath(brokerConfig.getBrokerClientKeyFilePath())
+                        
.tlsCertificateFilePath(brokerConfig.getBrokerClientCertificateFilePath());
+            }
+        } else {
+            internalListener = 
ServiceConfigurationUtils.getInternalListener(brokerConfig, "pulsar");
+            
clientBuilder.serviceUrl(internalListener.getBrokerServiceUrl().toString());
+        }
+
+        return clientBuilder.build();
+    }
+
     public static void main(String[] args) throws Exception {
         Arguments arguments = new Arguments();
         JCommander jcommander = new JCommander(arguments);
@@ -105,40 +144,10 @@ public class CompactorTool {
             );
         }
 
-        ClientBuilder clientBuilder = PulsarClient.builder()
-                .memoryLimit(0, SizeUnit.BYTES);
-
-        // Apply all arbitrary configuration. This must be called before 
setting any fields annotated as
-        // @Secret on the ClientConfigurationData object because of the way 
they are serialized.
-        // See https://github.com/apache/pulsar/issues/8509 for more 
information.
-        
clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(brokerConfig.getProperties(),
 "brokerClient_"));
-
-        if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) {
-            
clientBuilder.authentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
-                    brokerConfig.getBrokerClientAuthenticationParameters());
-        }
-
-        AdvertisedListener internalListener = 
ServiceConfigurationUtils.getInternalListener(brokerConfig, "pulsar+ssl");
-        if (internalListener.getBrokerServiceUrlTls() != null) {
-            log.info("Found a TLS-based advertised listener in configuration 
file. \n"
-                    + "Will connect pulsar use TLS.");
-            clientBuilder
-                    
.serviceUrl(internalListener.getBrokerServiceUrlTls().toString())
-                    
.allowTlsInsecureConnection(brokerConfig.isTlsAllowInsecureConnection())
-                    
.tlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());
-
-        } else {
-            internalListener = 
ServiceConfigurationUtils.getInternalListener(brokerConfig, "pulsar");
-            
clientBuilder.serviceUrl(internalListener.getBrokerServiceUrl().toString());
-        }
-
         @Cleanup(value = "shutdownNow")
         ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor(
                 new 
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
 
-        @Cleanup(value = "shutdownNow")
-        OrderedScheduler executor = 
OrderedScheduler.newSchedulerBuilder().build();
-
         @Cleanup
         MetadataStoreExtended store = 
MetadataStoreExtended.create(brokerConfig.getMetadataStoreUrl(),
                 MetadataStoreConfig.builder()
@@ -156,7 +165,7 @@ public class CompactorTool {
         BookKeeper bk = bkClientFactory.create(brokerConfig, store, 
eventLoopGroup, Optional.empty(), null);
 
         @Cleanup
-        PulsarClient pulsar = clientBuilder.build();
+        PulsarClient pulsar = createClient(brokerConfig);
 
         Compactor compactor = new TwoPhaseCompactor(brokerConfig, pulsar, bk, 
scheduler);
         long ledgerId = compactor.compact(arguments.topic).get();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorToolTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorToolTest.java
index 795cf2b7f7c..85f6f16fdb0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorToolTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorToolTest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.compaction;
 
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertTrue;
 import com.beust.jcommander.Parameter;
 import java.io.ByteArrayOutputStream;
@@ -25,6 +28,13 @@ import java.io.PrintStream;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.util.Arrays;
+import java.util.Optional;
+import java.util.Properties;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.util.CmdGenerateDocs;
 import org.testng.annotations.Test;
 
@@ -72,4 +82,44 @@ public class CompactorToolTest {
             System.setOut(oldStream);
         }
     }
+
+    @Test
+    public void testUseTlsUrlWithPEM() throws PulsarClientException {
+        ServiceConfiguration serviceConfiguration = 
spy(ServiceConfiguration.class);
+        serviceConfiguration.setBrokerServicePortTls(Optional.of(6651));
+        serviceConfiguration.setBrokerClientTlsEnabled(true);
+        serviceConfiguration.setProperties(new Properties());
+
+        @Cleanup
+        PulsarClient ignored = 
CompactorTool.createClient(serviceConfiguration);
+
+        verify(serviceConfiguration, times(1)).isBrokerClientTlsEnabled();
+        verify(serviceConfiguration, times(1)).isTlsAllowInsecureConnection();
+        verify(serviceConfiguration, times(1)).getBrokerClientKeyFilePath();
+        verify(serviceConfiguration, 
times(1)).getBrokerClientTrustCertsFilePath();
+        verify(serviceConfiguration, 
times(1)).getBrokerClientCertificateFilePath();
+    }
+
+    @Test
+    public void testUseTlsUrlWithKeystore() throws PulsarClientException {
+        ServiceConfiguration serviceConfiguration = 
spy(ServiceConfiguration.class);
+        serviceConfiguration.setBrokerServicePortTls(Optional.of(6651));
+        serviceConfiguration.setBrokerClientTlsEnabled(true);
+        serviceConfiguration.setBrokerClientTlsEnabledWithKeyStore(true);
+        
serviceConfiguration.setBrokerClientTlsTrustStore(MockedPulsarServiceBaseTest.BROKER_KEYSTORE_FILE_PATH);
+
+        serviceConfiguration.setProperties(new Properties());
+
+        @Cleanup
+        PulsarClient ignored = 
CompactorTool.createClient(serviceConfiguration);
+
+        verify(serviceConfiguration, times(1)).isBrokerClientTlsEnabled();
+        verify(serviceConfiguration, 
times(1)).isBrokerClientTlsEnabledWithKeyStore();
+        verify(serviceConfiguration, times(1)).getBrokerClientTlsKeyStore();
+        verify(serviceConfiguration, 
times(1)).getBrokerClientTlsKeyStorePassword();
+        verify(serviceConfiguration, 
times(1)).getBrokerClientTlsKeyStoreType();
+        verify(serviceConfiguration, times(1)).getBrokerClientTlsTrustStore();
+        verify(serviceConfiguration, 
times(1)).getBrokerClientTlsTrustStorePassword();
+        verify(serviceConfiguration, 
times(1)).getBrokerClientTlsTrustStoreType();
+    }
 }

Reply via email to