This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new c7514fe1067 [improve][cli] Using separate TLS config on the compactor
(#17426)
c7514fe1067 is described below
commit c7514fe1067771786ec645e21492f40718047d96
Author: Zixuan Liu <[email protected]>
AuthorDate: Thu Sep 29 11:49:21 2022 +0800
[improve][cli] Using separate TLS config on the compactor (#17426)
Signed-off-by: Zixuan Liu <[email protected]>
Signed-off-by: Zixuan Liu <[email protected]>
### Motivation
Improve the compactor tool, using separate TLS config
### Modifications
- Add separate TLS config on the compactor, both Keystore and PEM formats
are supported
- Fix correct use of service URL by
`brokerConfig.isBrokerClientTlsEnabled()` value
### Verifying this change
Test has been added.
### Documentation
Check the box below or label this PR directly.
Need to update docs?
- [ ] `doc-required`
(Your PR needs to update docs and you will update later)
- [x] `doc-not-needed`
(Please explain why)
- [ ] `doc`
(Your PR contains doc changes)
- [ ] `doc-complete`
(Docs have been already added)
(cherry picked from commit 537aa545c70aa0b3023ff026c4a919546340656b)
---
.../apache/pulsar/compaction/CompactorTool.java | 73 ++++++++++++----------
.../pulsar/compaction/CompactorToolTest.java | 50 +++++++++++++++
2 files changed, 91 insertions(+), 32 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index a54135de9a8..441d80c374b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -31,13 +31,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.Cleanup;
import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.BookKeeperClientFactoryImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
@@ -65,6 +65,45 @@ public class CompactorTool {
private boolean generateDocs = false;
}
+ public static PulsarClient createClient(ServiceConfiguration brokerConfig)
throws PulsarClientException {
+ ClientBuilder clientBuilder = PulsarClient.builder()
+ .memoryLimit(0, SizeUnit.BYTES);
+
+ // Apply all arbitrary configuration. This must be called before
setting any fields annotated as
+ // @Secret on the ClientConfigurationData object because of the way
they are serialized.
+ // See https://github.com/apache/pulsar/issues/8509 for more
information.
+
clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(brokerConfig.getProperties(),
"brokerClient_"));
+
+ if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) {
+
clientBuilder.authentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
+ brokerConfig.getBrokerClientAuthenticationParameters());
+ }
+
+ AdvertisedListener internalListener =
ServiceConfigurationUtils.getInternalListener(brokerConfig, "pulsar+ssl");
+ if (internalListener.getBrokerServiceUrlTls() != null &&
brokerConfig.isBrokerClientTlsEnabled()) {
+
clientBuilder.serviceUrl(internalListener.getBrokerServiceUrlTls().toString())
+
.allowTlsInsecureConnection(brokerConfig.isTlsAllowInsecureConnection());
+ if (brokerConfig.isBrokerClientTlsEnabledWithKeyStore()) {
+ clientBuilder.useKeyStoreTls(true)
+
.tlsKeyStoreType(brokerConfig.getBrokerClientTlsKeyStoreType())
+
.tlsKeyStorePath(brokerConfig.getBrokerClientTlsKeyStore())
+
.tlsKeyStorePassword(brokerConfig.getBrokerClientTlsKeyStorePassword())
+
.tlsTrustStoreType(brokerConfig.getBrokerClientTlsTrustStoreType())
+
.tlsTrustStorePath(brokerConfig.getBrokerClientTlsTrustStore())
+
.tlsTrustStorePassword(brokerConfig.getBrokerClientTlsTrustStorePassword());
+ } else {
+
clientBuilder.tlsTrustCertsFilePath(brokerConfig.getBrokerClientTrustCertsFilePath())
+
.tlsKeyFilePath(brokerConfig.getBrokerClientKeyFilePath())
+
.tlsCertificateFilePath(brokerConfig.getBrokerClientCertificateFilePath());
+ }
+ } else {
+ internalListener =
ServiceConfigurationUtils.getInternalListener(brokerConfig, "pulsar");
+
clientBuilder.serviceUrl(internalListener.getBrokerServiceUrl().toString());
+ }
+
+ return clientBuilder.build();
+ }
+
public static void main(String[] args) throws Exception {
Arguments arguments = new Arguments();
JCommander jcommander = new JCommander(arguments);
@@ -105,40 +144,10 @@ public class CompactorTool {
);
}
- ClientBuilder clientBuilder = PulsarClient.builder()
- .memoryLimit(0, SizeUnit.BYTES);
-
- // Apply all arbitrary configuration. This must be called before
setting any fields annotated as
- // @Secret on the ClientConfigurationData object because of the way
they are serialized.
- // See https://github.com/apache/pulsar/issues/8509 for more
information.
-
clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(brokerConfig.getProperties(),
"brokerClient_"));
-
- if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) {
-
clientBuilder.authentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
- brokerConfig.getBrokerClientAuthenticationParameters());
- }
-
- AdvertisedListener internalListener =
ServiceConfigurationUtils.getInternalListener(brokerConfig, "pulsar+ssl");
- if (internalListener.getBrokerServiceUrlTls() != null) {
- log.info("Found a TLS-based advertised listener in configuration
file. \n"
- + "Will connect pulsar use TLS.");
- clientBuilder
-
.serviceUrl(internalListener.getBrokerServiceUrlTls().toString())
-
.allowTlsInsecureConnection(brokerConfig.isTlsAllowInsecureConnection())
-
.tlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());
-
- } else {
- internalListener =
ServiceConfigurationUtils.getInternalListener(brokerConfig, "pulsar");
-
clientBuilder.serviceUrl(internalListener.getBrokerServiceUrl().toString());
- }
-
@Cleanup(value = "shutdownNow")
ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
- @Cleanup(value = "shutdownNow")
- OrderedScheduler executor =
OrderedScheduler.newSchedulerBuilder().build();
-
@Cleanup
MetadataStoreExtended store =
MetadataStoreExtended.create(brokerConfig.getMetadataStoreUrl(),
MetadataStoreConfig.builder()
@@ -156,7 +165,7 @@ public class CompactorTool {
BookKeeper bk = bkClientFactory.create(brokerConfig, store,
eventLoopGroup, Optional.empty(), null);
@Cleanup
- PulsarClient pulsar = clientBuilder.build();
+ PulsarClient pulsar = createClient(brokerConfig);
Compactor compactor = new TwoPhaseCompactor(brokerConfig, pulsar, bk,
scheduler);
long ledgerId = compactor.compact(arguments.topic).get();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorToolTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorToolTest.java
index 795cf2b7f7c..85f6f16fdb0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorToolTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorToolTest.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.compaction;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertTrue;
import com.beust.jcommander.Parameter;
import java.io.ByteArrayOutputStream;
@@ -25,6 +28,13 @@ import java.io.PrintStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.util.Arrays;
+import java.util.Optional;
+import java.util.Properties;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.CmdGenerateDocs;
import org.testng.annotations.Test;
@@ -72,4 +82,44 @@ public class CompactorToolTest {
System.setOut(oldStream);
}
}
+
+ @Test
+ public void testUseTlsUrlWithPEM() throws PulsarClientException {
+ ServiceConfiguration serviceConfiguration =
spy(ServiceConfiguration.class);
+ serviceConfiguration.setBrokerServicePortTls(Optional.of(6651));
+ serviceConfiguration.setBrokerClientTlsEnabled(true);
+ serviceConfiguration.setProperties(new Properties());
+
+ @Cleanup
+ PulsarClient ignored =
CompactorTool.createClient(serviceConfiguration);
+
+ verify(serviceConfiguration, times(1)).isBrokerClientTlsEnabled();
+ verify(serviceConfiguration, times(1)).isTlsAllowInsecureConnection();
+ verify(serviceConfiguration, times(1)).getBrokerClientKeyFilePath();
+ verify(serviceConfiguration,
times(1)).getBrokerClientTrustCertsFilePath();
+ verify(serviceConfiguration,
times(1)).getBrokerClientCertificateFilePath();
+ }
+
+ @Test
+ public void testUseTlsUrlWithKeystore() throws PulsarClientException {
+ ServiceConfiguration serviceConfiguration =
spy(ServiceConfiguration.class);
+ serviceConfiguration.setBrokerServicePortTls(Optional.of(6651));
+ serviceConfiguration.setBrokerClientTlsEnabled(true);
+ serviceConfiguration.setBrokerClientTlsEnabledWithKeyStore(true);
+
serviceConfiguration.setBrokerClientTlsTrustStore(MockedPulsarServiceBaseTest.BROKER_KEYSTORE_FILE_PATH);
+
+ serviceConfiguration.setProperties(new Properties());
+
+ @Cleanup
+ PulsarClient ignored =
CompactorTool.createClient(serviceConfiguration);
+
+ verify(serviceConfiguration, times(1)).isBrokerClientTlsEnabled();
+ verify(serviceConfiguration,
times(1)).isBrokerClientTlsEnabledWithKeyStore();
+ verify(serviceConfiguration, times(1)).getBrokerClientTlsKeyStore();
+ verify(serviceConfiguration,
times(1)).getBrokerClientTlsKeyStorePassword();
+ verify(serviceConfiguration,
times(1)).getBrokerClientTlsKeyStoreType();
+ verify(serviceConfiguration, times(1)).getBrokerClientTlsTrustStore();
+ verify(serviceConfiguration,
times(1)).getBrokerClientTlsTrustStorePassword();
+ verify(serviceConfiguration,
times(1)).getBrokerClientTlsTrustStoreType();
+ }
}