This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 01dfd3e03ea Revert "[fix][broker] Fix inconsistent topic policy
(#21231)"
01dfd3e03ea is described below
commit 01dfd3e03eab64a17d2d925448f3b571ce710d90
Author: fengyubiao <[email protected]>
AuthorDate: Sat Oct 7 23:45:30 2023 +0800
Revert "[fix][broker] Fix inconsistent topic policy (#21231)"
This reverts commit a0e2104642c5081986dbe7847c71997b06030eaa.
---
.../ProxySaslAuthenticationTest.java | 13 +-
.../authentication/SaslAuthenticateTest.java | 11 +-
.../pulsar/broker/service/BrokerService.java | 295 ++++++++++-----------
.../SystemTopicBasedTopicPoliciesService.java | 100 ++-----
.../broker/service/TopicPoliciesService.java | 41 ---
.../pulsar/broker/admin/PersistentTopicsTest.java | 1 -
.../pulsar/broker/admin/TopicPoliciesTest.java | 2 +-
.../admin/TopicPoliciesWithBrokerRestartTest.java | 104 --------
.../apache/pulsar/broker/admin/TopicsAuthTest.java | 2 -
.../apache/pulsar/broker/auth/AuthLogsTest.java | 2 -
.../pulsar/broker/auth/MockAuthentication.java | 6 +-
.../broker/auth/MockedPulsarServiceBaseTest.java | 19 --
.../broker/service/BrokerBookieIsolationTest.java | 3 +-
.../SystemTopicBasedTopicPoliciesServiceTest.java | 6 +-
.../service/persistent/PersistentTopicTest.java | 3 +-
.../AuthenticationTlsHostnameVerificationTest.java | 3 -
.../api/AuthorizationProducerConsumerTest.java | 5 -
.../client/api/MutualAuthenticationTest.java | 2 +-
.../TokenAuthenticatedProducerConsumerTest.java | 3 -
...kenOauth2AuthenticatedProducerConsumerTest.java | 14 +-
...eyStoreTlsProducerConsumerTestWithAuthTest.java | 22 --
.../impl/PatternTopicsConsumerImplAuthTest.java | 1 -
.../configurations/standalone_no_client_auth.conf | 3 +-
.../proxy/server/ProxyAuthenticationTest.java | 2 +-
.../proxy/server/ProxyForwardAuthDataTest.java | 2 +-
.../proxy/server/ProxyRolesEnforcementTest.java | 2 +-
.../server/ProxyWithAuthorizationNegTest.java | 2 -
.../apache/pulsar/sql/presto/TestPulsarAuth.java | 4 -
.../loadbalance/ExtensibleLoadManagerTest.java | 1 -
.../integration/presto/TestPulsarSQLAuth.java | 1 -
30 files changed, 197 insertions(+), 478 deletions(-)
diff --git
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
index f0e45aa734a..261efe680f8 100644
---
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
+++
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -49,7 +49,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;
import org.slf4j.Logger;
@@ -194,17 +193,15 @@ public class ProxySaslAuthenticationTest extends
ProducerConsumerBase {
conf.setAuthenticationProviders(providers);
conf.setClusterName("test");
conf.setSuperUserRoles(ImmutableSet.of("client/" +
localHostname + "@" + kdc.getRealm()));
- // set admin auth, to verify admin web resources
- Map<String, String> clientSaslConfig = new HashMap<>();
- clientSaslConfig.put("saslJaasClientSectionName",
"PulsarClient");
- clientSaslConfig.put("serverType", "broker");
-
conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
- conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
-
.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
super.init();
lookupUrl = new URI(pulsar.getBrokerServiceUrl());
+
+ // set admin auth, to verify admin web resources
+ Map<String, String> clientSaslConfig = new HashMap<>();
+ clientSaslConfig.put("saslJaasClientSectionName",
"PulsarClient");
+ clientSaslConfig.put("serverType", "broker");
log.info("set client jaas section name: PulsarClient");
admin = PulsarAdmin.builder()
.serviceHttpUrl(brokerUrl.toString())
diff --git
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index 76c6f023d9a..8a0d0392d13 100644
---
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -53,7 +53,6 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.sasl.SaslConstants;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
@@ -181,12 +180,7 @@ public class SaslAuthenticateTest extends
ProducerConsumerBase {
conf.setAuthenticationProviders(providers);
conf.setClusterName("test");
conf.setSuperUserRoles(ImmutableSet.of("client" + "@" +
kdc.getRealm()));
- Map<String, String> clientSaslConfig = new HashMap<>();
- clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
- clientSaslConfig.put("serverType", "broker");
-
conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
- conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
-
.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
+
super.init();
lookupUrl = new URI(pulsar.getWebServiceAddress());
@@ -197,6 +191,9 @@ public class SaslAuthenticateTest extends
ProducerConsumerBase {
.authentication(authSasl));
// set admin auth, to verify admin web resources
+ Map<String, String> clientSaslConfig = new HashMap<>();
+ clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
+ clientSaslConfig.put("serverType", "broker");
log.info("set client jaas section name: PulsarClient");
admin = PulsarAdmin.builder()
.serviceHttpUrl(brokerUrl.toString())
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7c7dec864e2..d6b17f4faa4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1769,172 +1769,165 @@ public class BrokerService implements Closeable {
});
}
- public CompletableFuture<ManagedLedgerConfig>
getManagedLedgerConfig(@Nonnull TopicName topicName) {
- requireNonNull(topicName);
+ public CompletableFuture<ManagedLedgerConfig>
getManagedLedgerConfig(TopicName topicName) {
NamespaceName namespace = topicName.getNamespaceObject();
ServiceConfiguration serviceConfig = pulsar.getConfiguration();
NamespaceResources nsr =
pulsar.getPulsarResources().getNamespaceResources();
LocalPoliciesResources lpr =
pulsar.getPulsarResources().getLocalPolicies();
- final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture;
- if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
- && !NamespaceService.isSystemServiceNamespace(namespace.toString())
- &&
!SystemTopicNames.isTopicPoliciesSystemTopic(topicName.toString())) {
- topicPoliciesFuture =
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName);
- } else {
- topicPoliciesFuture =
CompletableFuture.completedFuture(Optional.empty());
- }
- return topicPoliciesFuture.thenCompose(topicPoliciesOptional -> {
- final CompletableFuture<Optional<Policies>> nsPolicies =
nsr.getPoliciesAsync(namespace);
- final CompletableFuture<Optional<LocalPolicies>> lcPolicies =
lpr.getLocalPoliciesAsync(namespace);
- return nsPolicies.thenCombine(lcPolicies, (policies,
localPolicies) -> {
- PersistencePolicies persistencePolicies = null;
- RetentionPolicies retentionPolicies = null;
- OffloadPoliciesImpl topicLevelOffloadPolicies = null;
- if (topicPoliciesOptional.isPresent()) {
- final TopicPolicies topicPolicies =
topicPoliciesOptional.get();
- persistencePolicies = topicPolicies.getPersistence();
- retentionPolicies = topicPolicies.getRetentionPolicies();
- topicLevelOffloadPolicies =
topicPolicies.getOffloadPolicies();
- }
-
- if (persistencePolicies == null) {
- persistencePolicies = policies.map(p ->
p.persistence).orElseGet(
- () -> new
PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
-
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
-
serviceConfig.getManagedLedgerDefaultAckQuorum(),
-
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
- }
+ return nsr.getPoliciesAsync(namespace)
+ .thenCombine(lpr.getLocalPoliciesAsync(namespace), (policies,
localPolicies) -> {
+ PersistencePolicies persistencePolicies = null;
+ RetentionPolicies retentionPolicies = null;
+ OffloadPoliciesImpl topicLevelOffloadPolicies = null;
+
+ if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
+ &&
!NamespaceService.isSystemServiceNamespace(namespace.toString())) {
+ final TopicPolicies topicPolicies =
pulsar.getTopicPoliciesService()
+ .getTopicPoliciesIfExists(topicName);
+ if (topicPolicies != null) {
+ persistencePolicies =
topicPolicies.getPersistence();
+ retentionPolicies =
topicPolicies.getRetentionPolicies();
+ topicLevelOffloadPolicies =
topicPolicies.getOffloadPolicies();
+ }
+ }
- if (retentionPolicies == null) {
- retentionPolicies = policies.map(p ->
p.retention_policies).orElseGet(
- () -> new
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
-
serviceConfig.getDefaultRetentionSizeInMB())
- );
- }
+ if (persistencePolicies == null) {
+ persistencePolicies = policies.map(p ->
p.persistence).orElseGet(
+ () -> new
PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
+
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
+
serviceConfig.getManagedLedgerDefaultAckQuorum(),
+
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
+ }
- ManagedLedgerConfig managedLedgerConfig = new
ManagedLedgerConfig();
-
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
-
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
-
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
-
- if (serviceConfig.isStrictBookieAffinityEnabled()) {
-
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
- IsolatedBookieEnsemblePlacementPolicy.class);
- if (localPolicies.isPresent() &&
localPolicies.get().bookieAffinityGroup != null) {
- Map<String, Object> properties = new HashMap<>();
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
- } else if (isSystemTopic(topicName)) {
- Map<String, Object> properties = new HashMap<>();
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
"*");
- properties.put(IsolatedBookieEnsemblePlacementPolicy
- .SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
-
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
- } else {
- Map<String, Object> properties = new HashMap<>();
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
"");
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
"");
-
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ if (retentionPolicies == null) {
+ retentionPolicies = policies.map(p ->
p.retention_policies).orElseGet(
+ () -> new
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+
serviceConfig.getDefaultRetentionSizeInMB())
+ );
}
- } else {
- if (localPolicies.isPresent() &&
localPolicies.get().bookieAffinityGroup != null) {
+
+ ManagedLedgerConfig managedLedgerConfig = new
ManagedLedgerConfig();
+
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
+
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
+
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+
+ if (serviceConfig.isStrictBookieAffinityEnabled()) {
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
IsolatedBookieEnsemblePlacementPolicy.class);
- Map<String, Object> properties = new HashMap<>();
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ if (localPolicies.isPresent() &&
localPolicies.get().bookieAffinityGroup != null) {
+ Map<String, Object> properties = new HashMap<>();
+
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
+
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
+
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ } else if (isSystemTopic(topicName)) {
+ Map<String, Object> properties = new HashMap<>();
+
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
"*");
+
properties.put(IsolatedBookieEnsemblePlacementPolicy
+ .SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
+
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ } else {
+ Map<String, Object> properties = new HashMap<>();
+
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
"");
+
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
"");
+
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ }
+ } else {
+ if (localPolicies.isPresent() &&
localPolicies.get().bookieAffinityGroup != null) {
+
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
+
IsolatedBookieEnsemblePlacementPolicy.class);
+ Map<String, Object> properties = new HashMap<>();
+
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
+
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
+
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ }
}
- }
-
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
-
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
-
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
-
- managedLedgerConfig
-
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
-
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
-
serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
-
managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
-
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore());
-
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
- managedLedgerConfig
-
.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
- TimeUnit.MINUTES);
- managedLedgerConfig
-
.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
- TimeUnit.MINUTES);
-
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
-
- managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
-
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
- managedLedgerConfig
-
.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
- managedLedgerConfig
-
.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
-
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
- managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
-
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
-
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
-
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
- managedLedgerConfig
-
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
-
- managedLedgerConfig
-
.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
- managedLedgerConfig
-
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(),
TimeUnit.MINUTES);
-
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
-
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
-
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
- managedLedgerConfig.setInactiveLedgerRollOverTime(
-
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(),
TimeUnit.SECONDS);
- managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
- serviceConfig.isCacheEvictionByMarkDeletedPosition());
- managedLedgerConfig.setMinimumBacklogCursorsForCaching(
-
serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
- managedLedgerConfig.setMinimumBacklogEntriesForCaching(
-
serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
- managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
-
serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
-
- OffloadPoliciesImpl nsLevelOffloadPolicies =
- (OffloadPoliciesImpl) policies.map(p ->
p.offload_policies).orElse(null);
- OffloadPoliciesImpl offloadPolicies =
OffloadPoliciesImpl.mergeConfiguration(
- topicLevelOffloadPolicies,
-
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies,
policies.orElse(null)),
- getPulsar().getConfig().getProperties());
- if
(NamespaceService.isSystemServiceNamespace(namespace.toString())) {
-
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
- } else {
- if (topicLevelOffloadPolicies != null) {
- try {
- LedgerOffloader topicLevelLedgerOffLoader =
-
pulsar().createManagedLedgerOffloader(offloadPolicies);
-
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
- } catch (PulsarServerException e) {
- throw new RuntimeException(e);
+
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
+
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
+
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
+
+ managedLedgerConfig
+
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
+
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
+
serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
+
managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
+
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore());
+
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
+ managedLedgerConfig
+
.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
+ TimeUnit.MINUTES);
+ managedLedgerConfig
+
.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
+ TimeUnit.MINUTES);
+
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
+
+ managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
+
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
+ managedLedgerConfig
+
.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
+ managedLedgerConfig
+
.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
+
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
+ managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
+
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
+
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
+
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
+ managedLedgerConfig
+
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
+
+ managedLedgerConfig
+
.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
+ managedLedgerConfig
+
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(),
TimeUnit.MINUTES);
+
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
+
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
+
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
+ managedLedgerConfig.setInactiveLedgerRollOverTime(
+
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(),
TimeUnit.SECONDS);
+ managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
+
serviceConfig.isCacheEvictionByMarkDeletedPosition());
+ managedLedgerConfig.setMinimumBacklogCursorsForCaching(
+
serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
+ managedLedgerConfig.setMinimumBacklogEntriesForCaching(
+
serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
+ managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
+
serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
+
+ OffloadPoliciesImpl nsLevelOffloadPolicies =
+ (OffloadPoliciesImpl) policies.map(p ->
p.offload_policies).orElse(null);
+ OffloadPoliciesImpl offloadPolicies =
OffloadPoliciesImpl.mergeConfiguration(
+ topicLevelOffloadPolicies,
+
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies,
policies.orElse(null)),
+ getPulsar().getConfig().getProperties());
+ if
(NamespaceService.isSystemServiceNamespace(namespace.toString())) {
+
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
+ } else {
+ if (topicLevelOffloadPolicies != null) {
+ try {
+ LedgerOffloader topicLevelLedgerOffLoader =
+
pulsar().createManagedLedgerOffloader(offloadPolicies);
+
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+ } catch (PulsarServerException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ //If the topic level policy is null, use the
namespace level
+ managedLedgerConfig
+
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace,
offloadPolicies));
}
- } else {
- //If the topic level policy is null, use the namespace
level
- managedLedgerConfig
-
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace,
offloadPolicies));
}
- }
- managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
-
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
- managedLedgerConfig.setNewEntriesCheckDelayInMillis(
-
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
- return managedLedgerConfig;
- });
- });
+ managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
+
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
+ managedLedgerConfig.setNewEntriesCheckDelayInMillis(
+
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
+ return managedLedgerConfig;
+ });
}
private void addTopicToStatsMaps(TopicName topicName, Topic topic) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index ed76d37ae25..09f8de818db 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -18,14 +18,12 @@
*/
package org.apache.pulsar.broker.service;
-import static java.util.Objects.requireNonNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -56,7 +54,6 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
-import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,8 +78,8 @@ public class SystemTopicBasedTopicPoliciesService implements
TopicPoliciesServic
private final Map<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
readerCaches = new ConcurrentHashMap<>();
-
- final Map<NamespaceName, CompletableFuture<Void>> policyCacheInitMap = new
ConcurrentHashMap<>();
+ @VisibleForTesting
+ final Map<NamespaceName, Boolean> policyCacheInitMap = new
ConcurrentHashMap<>();
@VisibleForTesting
final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners =
new ConcurrentHashMap<>();
@@ -222,12 +219,12 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
boolean isGlobal) throws
TopicPoliciesCacheNotInitException {
if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
NamespaceName namespace = topicName.getNamespaceObject();
- prepareInitPoliciesCacheAsync(namespace);
+ prepareInitPoliciesCache(namespace, new CompletableFuture<>());
}
MutablePair<TopicPoliciesCacheNotInitException, TopicPolicies> result
= new MutablePair<>();
policyCacheInitMap.compute(topicName.getNamespaceObject(), (k,
initialized) -> {
- if (initialized == null || !initialized.isDone()) {
+ if (initialized == null || !initialized) {
result.setLeft(new TopicPoliciesCacheNotInitException());
} else {
TopicPolicies topicPolicies =
@@ -245,34 +242,6 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
}
- @NotNull
- @Override
- public CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesAsync(@NotNull TopicName topicName,
-
boolean isGlobal) {
- requireNonNull(topicName);
- final CompletableFuture<Void> preparedFuture =
prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
- return preparedFuture.thenApply(__ -> {
- final TopicPolicies candidatePolicies = isGlobal
- ?
globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName()))
- :
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
- return Optional.ofNullable(candidatePolicies);
- });
- }
-
- @NotNull
- @Override
- public CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesAsync(@NotNull TopicName topicName) {
- requireNonNull(topicName);
- final CompletableFuture<Void> preparedFuture =
prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
- return preparedFuture.thenApply(__ -> {
- final TopicPolicies localPolicies =
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
- if (localPolicies != null) {
- return Optional.of(localPolicies);
- }
- return
Optional.ofNullable(globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName())));
- });
- }
-
@Override
public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
return
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
@@ -296,48 +265,39 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
@Override
public CompletableFuture<Void>
addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
+ CompletableFuture<Void> result = new CompletableFuture<>();
NamespaceName namespace = namespaceBundle.getNamespaceObject();
if (NamespaceService.isHeartbeatNamespace(namespace)) {
- return CompletableFuture.completedFuture(null);
+ result.complete(null);
+ return result;
}
synchronized (this) {
if (readerCaches.get(namespace) != null) {
ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
- return CompletableFuture.completedFuture(null);
+ result.complete(null);
} else {
- return prepareInitPoliciesCacheAsync(namespace);
+ prepareInitPoliciesCache(namespace, result);
}
}
+ return result;
}
- private @Nonnull CompletableFuture<Void>
prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
- requireNonNull(namespace);
- return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
- final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
+ private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace,
CompletableFuture<Void> result) {
+ if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
createSystemTopicClientWithRetry(namespace);
readerCaches.put(namespace, readerCompletableFuture);
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new
AtomicInteger(1));
- final CompletableFuture<Void> initFuture = readerCompletableFuture
- .thenCompose(reader -> {
- final CompletableFuture<Void> stageFuture = new
CompletableFuture<>();
- initPolicesCache(reader, stageFuture);
- return stageFuture
- // Read policies in background
- .thenAccept(__ ->
readMorePoliciesAsync(reader));
- });
- initFuture.exceptionally(ex -> {
- try {
- log.error("[{}] Failed to create reader on __change_events
topic", namespace, ex);
- cleanCacheAndCloseReader(namespace, false);
- } catch (Throwable cleanupEx) {
- // Adding this catch to avoid break callback chain
- log.error("[{}] Failed to cleanup reader on
__change_events topic", namespace, cleanupEx);
- }
+ readerCompletableFuture.thenAccept(reader -> {
+ initPolicesCache(reader, result);
+ result.thenRun(() -> readMorePolicies(reader));
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to create reader on __change_events
topic", namespace, ex);
+ cleanCacheAndCloseReader(namespace, false);
+ result.completeExceptionally(ex);
return null;
});
- // let caller know we've got an exception.
- return initFuture;
- });
+ }
}
protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
createSystemTopicClientWithRetry(
@@ -421,7 +381,8 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
if (log.isDebugEnabled()) {
log.debug("[{}] Reach the end of the system topic.",
reader.getSystemTopic().getTopicName());
}
-
+ policyCacheInitMap.computeIfPresent(
+
reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
// replay policy message
policiesCache.forEach(((topicName, topicPolicies) -> {
if (listeners.get(topicName) != null) {
@@ -434,7 +395,6 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
}
}));
-
future.complete(null);
}
});
@@ -460,13 +420,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
});
}
- /**
- * This is an async method for the background reader to continue syncing
new messages.
- *
- * Note: You should not do any blocking call here. because it will affect
- * #{@link
SystemTopicBasedTopicPoliciesService#getTopicPoliciesAsync(TopicName)} method
to block loading topic.
- */
- private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent>
reader) {
+ private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent>
reader) {
reader.readNextAsync()
.thenAccept(msg -> {
refreshTopicPoliciesCache(msg);
@@ -474,7 +428,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
})
.whenComplete((__, ex) -> {
if (ex == null) {
- readMorePoliciesAsync(reader);
+ readMorePolicies(reader);
} else {
Throwable cause =
FutureUtil.unwrapCompletionException(ex);
if (cause instanceof
PulsarClientException.AlreadyClosedException) {
@@ -483,7 +437,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
} else {
log.warn("Read more topic polices exception, read
again.", ex);
- readMorePoliciesAsync(reader);
+ readMorePolicies(reader);
}
}
});
@@ -651,7 +605,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
@VisibleForTesting
- public CompletableFuture<Void> getPoliciesCacheInit(NamespaceName
namespaceName) {
+ public Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
return policyCacheInitMap.get(namespaceName);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index aa3a6aaeff2..c4bcc0c3935 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -22,7 +22,6 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
@@ -32,7 +31,6 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
-import org.jetbrains.annotations.NotNull;
/**
* Topic policies service.
@@ -111,32 +109,6 @@ public interface TopicPoliciesService {
return response;
}
- /**
- * Asynchronously retrieves topic policies.
- * This triggers the Pulsar broker's internal client to load policies from
the
- * system topic `persistent://tenant/namespace/__change_event`.
- *
- * @param topicName The name of the topic.
- * @param isGlobal Indicates if the policies are global.
- * @return A CompletableFuture containing an Optional of TopicPolicies.
- * @throws NullPointerException If the topicName is null.
- */
- @Nonnull
- CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull
TopicName topicName, boolean isGlobal);
-
- /**
- * Asynchronously retrieves topic policies.
- * This triggers the Pulsar broker's internal client to load policies from
the
- * system topic `persistent://tenant/namespace/__change_event`.
- *
- * NOTE: If local policies are not available, it will fallback to using
topic global policies.
- * @param topicName The name of the topic.
- * @return A CompletableFuture containing an Optional of TopicPolicies.
- * @throws NullPointerException If the topicName is null.
- */
- @Nonnull
- CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull
TopicName topicName);
-
/**
* Get policies for a topic without cache async.
* @param topicName topic name
@@ -190,19 +162,6 @@ public interface TopicPoliciesService {
return null;
}
- @NotNull
- @Override
- public CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesAsync(@NotNull TopicName topicName,
-
boolean isGlobal) {
- return CompletableFuture.completedFuture(Optional.empty());
- }
-
- @NotNull
- @Override
- public CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesAsync(@NotNull TopicName topicName) {
- return CompletableFuture.completedFuture(Optional.empty());
- }
-
@Override
public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
return null;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 8dee90af4af..284e50c8302 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -126,7 +126,6 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
@Override
@BeforeMethod
protected void setup() throws Exception {
- conf.setTopicLevelPoliciesEnabled(false);
super.internalSetup();
persistentTopics = spy(PersistentTopics.class);
persistentTopics.setServletContext(new MockServletContext());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index faf141a5d1c..87471f4972f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -180,7 +180,7 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
//make sure namespace policy reader is fully started.
Awaitility.await().untilAsserted(()-> {
-
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()).isDone());
+
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()));
});
//load the topic.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java
deleted file mode 100644
index 672fc2c95f8..00000000000
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.admin;
-
-import lombok.Cleanup;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.common.policies.data.RetentionPolicies;
-import org.awaitility.Awaitility;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.TimeUnit;
-
-@Slf4j
-@Test(groups = "broker-admin")
-public class TopicPoliciesWithBrokerRestartTest extends
MockedPulsarServiceBaseTest {
-
- @Override
- @BeforeClass(alwaysRun = true)
- protected void setup() throws Exception {
- super.internalSetup();
- setupDefaultTenantAndNamespace();
- }
-
- @Override
- @AfterClass(alwaysRun = true)
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
-
-
- @Test
- public void testRetentionWithBrokerRestart() throws Exception {
- final int messages = 1_000;
- final int topicNum = 500;
- // (1) Init topic
- admin.namespaces().createNamespace("public/retention");
- final String topicName =
"persistent://public/retention/retention_with_broker_restart";
- admin.topics().createNonPartitionedTopic(topicName);
- for (int i = 0; i < topicNum; i++) {
- final String shadowTopicNames = topicName + "_" + i;
- admin.topics().createNonPartitionedTopic(shadowTopicNames);
- }
- // (2) Set retention
- final RetentionPolicies retentionPolicies = new RetentionPolicies(20,
20);
- for (int i = 0; i < topicNum; i++) {
- final String shadowTopicNames = topicName + "_" + i;
- admin.topicPolicies().setRetention(shadowTopicNames,
retentionPolicies);
- }
- admin.topicPolicies().setRetention(topicName, retentionPolicies);
- // (3) Send messages
- @Cleanup
- final Producer<byte[]> publisher = pulsarClient.newProducer()
- .topic(topicName)
- .create();
- for (int i = 0; i < messages; i++) {
- publisher.send((i + "").getBytes(StandardCharsets.UTF_8));
- }
- // (4) Check configuration
- Awaitility.await().untilAsserted(() -> {
- final PersistentTopic persistentTopic1 = (PersistentTopic)
- pulsar.getBrokerService().getTopic(topicName,
true).join().get();
- final ManagedLedgerImpl managedLedger1 = (ManagedLedgerImpl)
persistentTopic1.getManagedLedger();
-
Assert.assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(), 20);
-
Assert.assertEquals(managedLedger1.getConfig().getRetentionTimeMillis(),
- TimeUnit.MINUTES.toMillis(20));
- });
- // (5) Restart broker
- restartBroker();
- // (6) Check configuration again
- for (int i = 0; i < topicNum; i++) {
- final String shadowTopicNames = topicName + "_" + i;
- admin.lookups().lookupTopic(shadowTopicNames);
- final PersistentTopic persistentTopicTmp = (PersistentTopic)
- pulsar.getBrokerService().getTopic(shadowTopicNames,
true).join().get();
- final ManagedLedgerImpl managedLedgerTemp = (ManagedLedgerImpl)
persistentTopicTmp.getManagedLedger();
-
Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionSizeInMB(), 20);
-
Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionTimeMillis(),
- TimeUnit.MINUTES.toMillis(20));
- }
- }
-}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java
index 234af7afa8d..efd8b66d754 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java
@@ -84,8 +84,6 @@ public class TopicsAuthTest extends
MockedPulsarServiceBaseTest {
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderToken.class.getName());
conf.setAuthenticationProviders(providers);
-
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
- conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
super.internalSetup();
PulsarAdminBuilder pulsarAdminBuilder =
PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
? brokerUrl.toString() : brokerUrlTls.toString())
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
index 942a42fa7aa..6ffcecbeb9f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
@@ -60,8 +60,6 @@ public class AuthLogsTest extends MockedPulsarServiceBaseTest
{
conf.setAuthorizationEnabled(true);
conf.setAuthorizationAllowWildcardsMatching(true);
conf.setSuperUserRoles(Sets.newHashSet("super"));
-
conf.setBrokerClientAuthenticationPlugin(MockAuthentication.class.getName());
- conf.setBrokerClientAuthenticationParameters("user:pass.pass");
internalSetup();
try (PulsarAdmin admin = PulsarAdmin.builder()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
index 25ac59796b0..0b1726617f7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
@@ -29,10 +29,7 @@ import org.slf4j.LoggerFactory;
public class MockAuthentication implements Authentication {
private static final Logger log =
LoggerFactory.getLogger(MockAuthentication.class);
- private String user;
-
- public MockAuthentication() {
- }
+ private final String user;
public MockAuthentication(String user) {
this.user = user;
@@ -70,7 +67,6 @@ public class MockAuthentication implements Authentication {
@Override
public void configure(Map<String, String> authParams) {
- this.user = authParams.get("user");
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 0f04088e21a..b16dc27e265 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -42,7 +42,6 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -54,8 +53,6 @@ import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ProducerImpl;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -225,22 +222,6 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
protected final void init() throws Exception {
doInitConf();
- // trying to config the broker internal client
- if (conf.getWebServicePortTls().isPresent()
- &&
conf.getAuthenticationProviders().contains(AuthenticationProviderTls.class.getName())
- && !conf.isTlsEnabledWithKeyStore()) {
- // enabled TLS
- if (conf.getBrokerClientAuthenticationPlugin() == null
- ||
conf.getBrokerClientAuthenticationPlugin().equals(AuthenticationDisabled.class.getName()))
{
-
conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
- conf.setBrokerClientAuthenticationParameters("tlsCertFile:" +
BROKER_CERT_FILE_PATH
- + ",tlsKeyFile:"
+ BROKER_KEY_FILE_PATH);
- conf.setBrokerClientTlsEnabled(true);
- conf.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
- conf.setBrokerClientCertificateFilePath(BROKER_CERT_FILE_PATH);
- conf.setBrokerClientKeyFilePath(BROKER_KEY_FILE_PATH);
- }
- }
startBroker();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index 5252407892e..951892f4ebf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -304,7 +304,6 @@ public class BrokerBookieIsolationTest {
bookies[3].getBookieId());
ServiceConfiguration config = new ServiceConfiguration();
- config.setTopicLevelPoliciesEnabled(false);
config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config.setClusterName(cluster);
config.setWebServicePort(Optional.of(0));
@@ -613,9 +612,9 @@ public class BrokerBookieIsolationTest {
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
- config.setTopicLevelPoliciesEnabled(false);
config.setAdvertisedAddress("localhost");
config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups);
+
config.setManagedLedgerDefaultEnsembleSize(2);
config.setManagedLedgerDefaultWriteQuorum(2);
config.setManagedLedgerDefaultAckQuorum(2);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 5b70ff99675..31b5bcb23cd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -141,7 +141,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
// Wait for all topic policies updated.
Awaitility.await().untilAsserted(() ->
Assert.assertTrue(systemTopicBasedTopicPoliciesService
-
.getPoliciesCacheInit(TOPIC1.getNamespaceObject()).isDone()));
+ .getPoliciesCacheInit(TOPIC1.getNamespaceObject())));
// Assert broker is cache all topic policies
Awaitility.await().untilAsserted(() ->
@@ -304,8 +304,8 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
@Test
public void testGetPolicyTimeout() throws Exception {
SystemTopicBasedTopicPoliciesService service =
(SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
- Awaitility.await().untilAsserted(() ->
assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject()).isDone()));
- service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), new
CompletableFuture<>());
+ Awaitility.await().untilAsserted(() ->
assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject())));
+ service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), false);
long start = System.currentTimeMillis();
Backoff backoff = new BackoffBuilder()
.setInitialTime(500, TimeUnit.MILLISECONDS)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index ac2727e33eb..41704af0b8b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -45,7 +45,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
-import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -621,7 +620,7 @@ public class PersistentTopicTest extends BrokerTestBase {
doReturn(policiesService).when(pulsar).getTopicPoliciesService();
TopicPolicies policies = new TopicPolicies();
policies.setRetentionPolicies(retentionPolicies);
-
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(policiesService).getTopicPoliciesAsync(TopicName.get(topic));
+
doReturn(policies).when(policiesService).getTopicPoliciesIfExists(TopicName.get(topic));
persistentTopic.onUpdate(policies);
verify(persistentTopic, times(1)).checkPersistencePolicies();
Awaitility.await().untilAsserted(() -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
index a392f150e85..f2631f59121 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
@@ -30,7 +30,6 @@ import
org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.tls.PublicSuffixMatcher;
import org.apache.pulsar.common.tls.TlsHostnameVerifier;
-import org.assertj.core.util.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -148,7 +147,6 @@ public class AuthenticationTlsHostnameVerificationTest
extends ProducerConsumerB
// setup broker cert which has CN = "pulsar" different than broker's
hostname="localhost"
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
-
conf.setAuthenticationProviders(Sets.newTreeSet(AuthenticationProviderTls.class.getName()));
conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
conf.setTlsCertificateFilePath(TLS_MIM_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_MIM_SERVER_KEY_FILE_PATH);
@@ -190,7 +188,6 @@ public class AuthenticationTlsHostnameVerificationTest
extends ProducerConsumerB
// setup broker cert which has CN = "localhost"
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
-
conf.setAuthenticationProviders(Sets.newTreeSet(AuthenticationProviderTls.class.getName()));
conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index ba41848bf2c..0ce3b7df07d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -119,7 +119,6 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
public void testProducerAndConsumerAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);
cleanup();
- conf.setTopicLevelPoliciesEnabled(false);
conf.setAuthorizationProvider(TestAuthorizationProvider.class.getName());
setup();
@@ -180,7 +179,6 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
public void testSubscriberPermission() throws Exception {
log.info("-- Starting {} test --", methodName);
cleanup();
- conf.setTopicLevelPoliciesEnabled(false);
conf.setEnablePackagesManagement(true);
conf.setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName());
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
@@ -371,7 +369,6 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
public void testClearBacklogPermission() throws Exception {
log.info("-- Starting {} test --", methodName);
cleanup();
- conf.setTopicLevelPoliciesEnabled(false);
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
setup();
@@ -613,7 +610,6 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
public void testSubscriptionPrefixAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);
cleanup();
- conf.setTopicLevelPoliciesEnabled(false);
conf.setAuthorizationProvider(TestAuthorizationProviderWithSubscriptionPrefix.class.getName());
setup();
@@ -698,7 +694,6 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
public void testPermissionForProducerCreateInitialSubscription() throws
Exception {
log.info("-- Starting {} test --", methodName);
cleanup();
- conf.setTopicLevelPoliciesEnabled(false);
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
setup();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
index 81d65b19204..2fc8aebf64a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
@@ -195,7 +195,7 @@ public class MutualAuthenticationTest extends
ProducerConsumerBase {
Set<String> superUserRoles = new HashSet<>();
superUserRoles.add("admin");
conf.setSuperUserRoles(superUserRoles);
- conf.setTopicLevelPoliciesEnabled(false);
+
conf.setAuthorizationEnabled(true);
conf.setAuthenticationEnabled(true);
Set<String> providersClassNames =
Sets.newHashSet(MutualAuthenticationProvider.class.getName());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
index 4d5e7deaf7d..87f12e6acdc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
@@ -37,7 +37,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
@@ -93,8 +92,6 @@ public class TokenAuthenticatedProducerConsumerTest extends
ProducerConsumerBase
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderToken.class.getName());
conf.setAuthenticationProviders(providers);
-
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
- conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
conf.setClusterName("test");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
index ba43ee6d6a2..fdf41c4a6ad 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -27,9 +27,7 @@ import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -41,7 +39,6 @@ import
org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,12 +87,11 @@ public class TokenOauth2AuthenticatedProducerConsumerTest
extends ProducerConsum
conf.setAuthenticationProviders(providers);
conf.setBrokerClientAuthenticationPlugin(AuthenticationOAuth2.class.getName());
- final Map<String, String> oauth2Param = new HashMap<>();
- oauth2Param.put("privateKey", CREDENTIALS_FILE);
- oauth2Param.put("issuerUrl", server.getIssuer());
- oauth2Param.put("audience", audience);
- conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
-
.getMapper().getObjectMapper().writeValueAsString(oauth2Param));
+ conf.setBrokerClientAuthenticationParameters("{\n"
+ + " \"privateKey\": \"" + CREDENTIALS_FILE + "\",\n"
+ + " \"issuerUrl\": \"" + server.getIssuer() + "\",\n"
+ + " \"audience\": \"" + audience + "\",\n"
+ + "}\n");
conf.setClusterName("test");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
index 77405e14201..8e508b6cf20 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
@@ -32,7 +32,6 @@ import java.util.function.Supplier;
import io.jsonwebtoken.SignatureAlgorithm;
import lombok.Cleanup;
-import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
@@ -50,7 +49,6 @@ import
org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -85,7 +83,6 @@ public class KeyStoreTlsProducerConsumerTestWithAuthTest
extends ProducerConsume
super.internalCleanup();
}
- @SneakyThrows
protected void internalSetUpForBroker() {
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
@@ -117,25 +114,6 @@ public class KeyStoreTlsProducerConsumerTestWithAuthTest
extends ProducerConsume
conf.setAuthenticationProviders(providers);
conf.setNumExecutorThreadPoolSize(5);
- Set<String> tlsProtocols = Sets.newConcurrentHashSet();
- tlsProtocols.add("TLSv1.3");
- tlsProtocols.add("TLSv1.2");
-
conf.setBrokerClientAuthenticationPlugin(AuthenticationKeyStoreTls.class.getName());
- Map<String, String> authParams = new HashMap<>();
- authParams.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, KEYSTORE_TYPE);
- authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH,
CLIENT_KEYSTORE_FILE_PATH);
- authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW,
CLIENT_KEYSTORE_PW);
-
conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory.getMapper()
- .getObjectMapper().writeValueAsString(authParams));
- conf.setBrokerClientTlsEnabled(true);
- conf.setBrokerClientTlsEnabledWithKeyStore(true);
- conf.setBrokerClientTlsTrustStore(BROKER_TRUSTSTORE_FILE_PATH);
- conf.setBrokerClientTlsTrustStorePassword(BROKER_TRUSTSTORE_PW);
- conf.setBrokerClientTlsKeyStore(CLIENT_KEYSTORE_FILE_PATH);
- conf.setBrokerClientTlsKeyStoreType(KEYSTORE_TYPE);
- conf.setBrokerClientTlsKeyStorePassword(CLIENT_KEYSTORE_PW);
- conf.setBrokerClientTlsProtocols(tlsProtocols);
-
}
protected void internalSetUpForClient(boolean addCertificates, String
lookupUrl) throws Exception {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
index b9139dabdf0..76936334eb0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
@@ -85,7 +85,6 @@ public class PatternTopicsConsumerImplAuthTest extends
ProducerConsumerBase {
// set isTcpLookup = true, to use BinaryProtoLookupService to get
topics for a pattern.
isTcpLookup = true;
- conf.setTopicLevelPoliciesEnabled(false);
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
diff --git
a/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
index 4e2fd402983..d9411e655ad 100644
---
a/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
+++
b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
@@ -29,5 +29,4 @@ authenticationEnabled=true
authenticationProviders=org.apache.pulsar.MockTokenAuthenticationProvider
brokerClientAuthenticationPlugin=
brokerClientAuthenticationParameters=
-loadBalancerOverrideBrokerNicSpeedGbps=2
-topicLevelPoliciesEnabled=false
\ No newline at end of file
+loadBalancerOverrideBrokerNicSpeedGbps=2
\ No newline at end of file
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 9c8e5197adf..8229d929ee5 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -168,7 +168,7 @@ public class ProxyAuthenticationTest extends
ProducerConsumerBase {
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
// Expires after an hour
conf.setBrokerClientAuthenticationParameters(
- "entityType:admin,expiryTime:" +
(System.currentTimeMillis() + 3600 * 1000));
+ "entityType:broker,expiryTime:" +
(System.currentTimeMillis() + 3600 * 1000));
Set<String> superUserRoles = new HashSet<>();
superUserRoles.add("admin");
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index b7cfb874747..99af3b1cf6a 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -53,7 +53,7 @@ public class ProxyForwardAuthDataTest extends
ProducerConsumerBase {
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
- conf.setBrokerClientAuthenticationParameters("authParam:admin");
+ conf.setBrokerClientAuthenticationParameters("authParam:broker");
conf.setAuthenticateOriginalAuthData(true);
Set<String> superUserRoles = new HashSet<String>();
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index 3259cfd95c7..2c8c382b6a5 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -144,7 +144,7 @@ public class ProxyRolesEnforcementTest extends
ProducerConsumerBase {
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
- conf.setBrokerClientAuthenticationParameters("authParam:admin");
+ conf.setBrokerClientAuthenticationParameters("authParam:broker");
Set<String> superUserRoles = new HashSet<>();
superUserRoles.add("admin");
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index 2d97a4b06a8..e8bb128c8c1 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -78,8 +78,6 @@ public class ProxyWithAuthorizationNegTest extends
ProducerConsumerBase {
protected void setup() throws Exception {
// enable tls and auth&auth at broker
- conf.setTopicLevelPoliciesEnabled(false);
-
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
diff --git
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
index 7b550b7270f..9119ffed4e2 100644
---
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
+++
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
@@ -38,7 +38,6 @@ import
org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -64,9 +63,6 @@ public class TestPulsarAuth extends
MockedPulsarServiceBaseTest {
conf.setProperties(properties);
conf.setSuperUserRoles(Sets.newHashSet(SUPER_USER_ROLE));
conf.setClusterName("c1");
-
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
- conf.setBrokerClientAuthenticationParameters("token:" + AuthTokenUtils
- .createToken(secretKey, SUPER_USER_ROLE, Optional.empty()));
internalSetup();
admin.clusters().createCluster("c1", ClusterData.builder().build());
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index 49e5ae37834..057039edc3b 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -89,7 +89,6 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
"org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder");
brokerEnvs.put("forceDeleteNamespaceAllowed", "true");
brokerEnvs.put("loadBalancerDebugModeEnabled", "true");
- brokerEnvs.put("topicLevelPoliciesEnabled", "false");
brokerEnvs.put("PULSAR_MEM", "-Xmx512M");
spec.brokerEnvs(brokerEnvs);
pulsarCluster = PulsarCluster.forSpec(spec);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
index 87db46f2bb6..0a9bb5e1959 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
@@ -68,7 +68,6 @@ public class TestPulsarSQLAuth extends TestPulsarSQLBase {
envMap.put("tokenSecretKey",
AuthTokenUtils.encodeKeyBase64(secretKey));
envMap.put("superUserRoles", "admin");
envMap.put("brokerDeleteInactiveTopicsEnabled", "false");
- envMap.put("topicLevelPoliciesEnabled", "false");
for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
brokerContainer.withEnv(envMap);