This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new d966c1af89f [Branch 2.7] Revert some PRs to fix CI for branch 2.7
(#16882)
d966c1af89f is described below
commit d966c1af89f7447470c0cf853f28f466dfdaf9da
Author: JiangHaiting <[email protected]>
AuthorDate: Sun Jul 31 14:31:36 2022 +0800
[Branch 2.7] Revert some PRs to fix CI for branch 2.7 (#16882)
* Revert "[fix][proxy] Fix client service url (#16834)"
This reverts commit 10b4e99a815d1c36660c315171f71d1f866a4f74.
* Revert "[Build] Use grpc-bom to align grpc library versions (#15234)"
This reverts commit 99c93d2f5fe2f72e185d46813383c7f7c984022d.
* Revert "upgrade aircompressor to 0.20 (#11790)"
This reverts commit 5ad16b623fdf3a2157577694ff4bc7fd06450a9b.
* Revert "[Branch-2.7] Fixed deadlock on metadata cache missing while doing
checkReplication (#12484)"
This reverts commit 32fe228854464504d18de240f719b583cf262042.
* Revert changes of PersistentTopic#getMessageTTL in #12339.
Co-authored-by: JiangHaiting <[email protected]>
---
distribution/server/src/assemble/LICENSE.bin.txt | 2 +-
pom.xml | 32 ++--
.../pulsar/broker/service/BrokerService.java | 210 ++++++++++-----------
.../broker/service/persistent/PersistentTopic.java | 115 ++++++-----
.../pulsar/proxy/server/ProxyConnection.java | 4 +-
.../pulsar/proxy/server/ProxyConnectionTest.java | 24 ---
pulsar-sql/presto-distribution/LICENSE | 2 +-
7 files changed, 173 insertions(+), 216 deletions(-)
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt
b/distribution/server/src/assemble/LICENSE.bin.txt
index 936528ad8b7..41052de646e 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -425,7 +425,7 @@ The Apache Software License, Version 2.0
- org.apache.httpcomponents-httpclient-4.5.5.jar
- org.apache.httpcomponents-httpcore-4.4.9.jar
* AirCompressor
- - io.airlift-aircompressor-0.20.jar
+ - io.airlift-aircompressor-0.16.jar
* AsyncHttpClient
- org.asynchttpclient-async-http-client-2.12.1.jar
- org.asynchttpclient-async-http-client-netty-utils-2.12.1.jar
diff --git a/pom.xml b/pom.xml
index 40d045ec0dc..718ef2d81df 100644
--- a/pom.xml
+++ b/pom.xml
@@ -157,7 +157,7 @@ flexible messaging model and an intuitive client
API.</description>
<prometheus-jmx.version>0.14.0</prometheus-jmx.version>
<confluent.version>5.3.2</confluent.version>
<kafka-avro-convert-jackson.version>1.9.13</kafka-avro-convert-jackson.version>
- <aircompressor.version>0.20</aircompressor.version>
+ <aircompressor.version>0.16</aircompressor.version>
<asynchttpclient.version>2.12.1</asynchttpclient.version>
<jcommander.version>1.78</jcommander.version>
<commons-lang3.version>3.6</commons-lang3.version>
@@ -598,7 +598,7 @@ flexible messaging model and an intuitive client
API.</description>
<artifactId>jna</artifactId>
<version>${jna.version}</version>
</dependency>
-
+
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-core</artifactId>
@@ -787,14 +787,6 @@ flexible messaging model and an intuitive client
API.</description>
<version>${typetools.version}</version>
</dependency>
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-bom</artifactId>
- <version>${grpc.version}</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
-
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
@@ -806,7 +798,25 @@ flexible messaging model and an intuitive client
API.</description>
</exclusion>
</exclusions>
</dependency>
-
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf-lite</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-bom</artifactId>
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 40fa540b9c0..29d1001e4bd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1190,139 +1190,119 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
Optional<Policies> policies = Optional.empty();
Optional<LocalPolicies> localPolicies = Optional.empty();
- PersistencePolicies tmpPersistencePolicies = null;
- RetentionPolicies tmpRetentionPolicies = null;
- OffloadPolicies tmpTopicLevelOffloadPolicies = null;
+ PersistencePolicies persistencePolicies = null;
+ RetentionPolicies retentionPolicies = null;
+ OffloadPolicies topicLevelOffloadPolicies = null;
if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
try {
TopicPolicies topicPolicies =
pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
if (topicPolicies != null) {
- tmpPersistencePolicies =
topicPolicies.getPersistence();
- tmpRetentionPolicies =
topicPolicies.getRetentionPolicies();
- tmpTopicLevelOffloadPolicies =
topicPolicies.getOffloadPolicies();
+ persistencePolicies = topicPolicies.getPersistence();
+ retentionPolicies =
topicPolicies.getRetentionPolicies();
+ topicLevelOffloadPolicies =
topicPolicies.getOffloadPolicies();
}
} catch
(BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.debug("Topic {} policies have not been initialized
yet.", topicName);
}
}
- final PersistencePolicies finalPersistencePolicies =
tmpPersistencePolicies;
- final RetentionPolicies finalRetentionPolicies =
tmpRetentionPolicies;
- final OffloadPolicies finalTopicLevelOffloadPolicies =
tmpTopicLevelOffloadPolicies;
-
-
- CompletableFuture<Optional<Policies>> policiesFuture = pulsar
-
.getConfigurationCache().policiesCache().getAsync(AdminResource.path(POLICIES,
- namespace.toString()));
- String path = joinPath(LOCAL_POLICIES_ROOT,
topicName.getNamespaceObject().toString());
- CompletableFuture<Optional<LocalPolicies>> localPoliciesFuture =
-
pulsar().getLocalZkCacheService().policiesCache().getAsync(path);
-
- policiesFuture.thenCombine(localPoliciesFuture, (optPolicies,
optLocalPolicies) -> {
- PersistencePolicies persistencePolicies =
finalPersistencePolicies;
- RetentionPolicies retentionPolicies = finalRetentionPolicies;
- OffloadPolicies topicLevelOffloadPolicies =
finalTopicLevelOffloadPolicies;
-
- if (persistencePolicies == null) {
- persistencePolicies = policies.map(p ->
p.persistence).orElseGet(
- () -> new
PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
-
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
-
serviceConfig.getManagedLedgerDefaultAckQuorum(),
-
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
- }
+ try {
+ policies = pulsar
+
.getConfigurationCache().policiesCache().get(AdminResource.path(POLICIES,
+ namespace.toString()));
+ String path = joinPath(LOCAL_POLICIES_ROOT,
topicName.getNamespaceObject().toString());
+ localPolicies =
pulsar().getLocalZkCacheService().policiesCache().get(path);
+ } catch (Throwable t) {
+ // Ignoring since if we don't have policies, we fallback on
the default
+ log.warn("Got exception when reading persistence policy for
{}: {}", topicName, t.getMessage(), t);
+ future.completeExceptionally(t);
+ return;
+ }
- if (retentionPolicies == null) {
- retentionPolicies = policies.map(p ->
p.retention_policies).orElseGet(
- () -> new
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
-
serviceConfig.getDefaultRetentionSizeInMB())
- );
- }
+ if (persistencePolicies == null) {
+ persistencePolicies = policies.map(p ->
p.persistence).orElseGet(
+ () -> new
PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
+
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
+
serviceConfig.getManagedLedgerDefaultAckQuorum(),
+
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
+ }
- ManagedLedgerConfig managedLedgerConfig = new
ManagedLedgerConfig();
-
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
-
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
-
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
- if (localPolicies.isPresent() &&
localPolicies.get().bookieAffinityGroup != null) {
- managedLedgerConfig
- .setBookKeeperEnsemblePlacementPolicyClassName(
-
ZkIsolatedBookieEnsemblePlacementPolicy.class);
- Map<String, Object> properties = Maps.newHashMap();
-
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary);
-
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary);
-
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
- }
-
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
-
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
-
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
-
- managedLedgerConfig.setMaxUnackedRangesToPersist(
-
serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
- managedLedgerConfig.setMaxUnackedRangesToPersistInZk(
-
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
-
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
-
managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
- TimeUnit.MINUTES);
-
managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
- TimeUnit.MINUTES);
-
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
-
- managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
-
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
-
managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
-
managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
-
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
- managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
-
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
-
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
-
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
+ if (retentionPolicies == null) {
+ retentionPolicies = policies.map(p ->
p.retention_policies).orElseGet(
+ () -> new
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+ serviceConfig.getDefaultRetentionSizeInMB())
+ );
+ }
+
+ ManagedLedgerConfig managedLedgerConfig = new
ManagedLedgerConfig();
+
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
+
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
+
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+ if (localPolicies.isPresent() &&
localPolicies.get().bookieAffinityGroup != null) {
managedLedgerConfig
-
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
-
- managedLedgerConfig.setLedgerRolloverTimeout(
-
serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
-
managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(),
TimeUnit.MINUTES);
-
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
-
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
-
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
-
- OffloadPolicies nsLevelOffloadPolicies = policies.map(p ->
p.offload_policies).orElse(null);
- OffloadPolicies offloadPolicies =
OffloadPolicies.mergeConfiguration(
- topicLevelOffloadPolicies,
-
OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies,
policies.orElse(null)),
- getPulsar().getConfig().getProperties());
- if (topicLevelOffloadPolicies != null) {
- try {
- LedgerOffloader topicLevelLedgerOffLoader =
-
pulsar().createManagedLedgerOffloader(offloadPolicies);
-
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
- } catch (PulsarServerException e) {
- future.completeExceptionally(e);
- return null;
- }
- } else {
- //If the topic level policy is null, use the namespace
level
- managedLedgerConfig.setLedgerOffloader(
- pulsar.getManagedLedgerOffloader(namespace,
offloadPolicies));
+
.setBookKeeperEnsemblePlacementPolicyClassName(ZkIsolatedBookieEnsemblePlacementPolicy.class);
+ Map<String, Object> properties = Maps.newHashMap();
+
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+
localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary);
+
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+
localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary);
+
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ }
+
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
+
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
+
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
+
+
managedLedgerConfig.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
+
managedLedgerConfig.setMaxUnackedRangesToPersistInZk(serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
+
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
+
managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
+ TimeUnit.MINUTES);
+
managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
+ TimeUnit.MINUTES);
+
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
+
+ managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
+
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
+
managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
+
managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
+
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
+ managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
+
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
+
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
+
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
+ managedLedgerConfig
+
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
+
+
managedLedgerConfig.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
+
managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(),
TimeUnit.MINUTES);
+
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
+
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
+
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
+
+ OffloadPolicies nsLevelOffloadPolicies = policies.map(p ->
p.offload_policies).orElse(null);
+ OffloadPolicies offloadPolicies =
OffloadPolicies.mergeConfiguration(
+ topicLevelOffloadPolicies,
+
OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies,
policies.orElse(null)),
+ getPulsar().getConfig().getProperties());
+ if (topicLevelOffloadPolicies != null) {
+ try {
+ LedgerOffloader topicLevelLedgerOffLoader =
pulsar().createManagedLedgerOffloader(offloadPolicies);
+
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+ } catch (PulsarServerException e) {
+ future.completeExceptionally(e);
+ return;
}
+ } else {
+ //If the topic level policy is null, use the namespace level
+
managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace,
offloadPolicies));
+ }
- managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
-
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
- managedLedgerConfig.setNewEntriesCheckDelayInMillis(
-
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
-
-
- future.complete(managedLedgerConfig);
- return null;
- }).exceptionally(ex -> {
- log.warn("Got exception when reading persistence policy for
{}: {}", topicName, ex.getMessage(), ex);
- future.completeExceptionally(ex);
- return null;
- });
+
managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
+
managedLedgerConfig.setNewEntriesCheckDelayInMillis(serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
+ future.complete(managedLedgerConfig);
}, (exception) -> future.completeExceptionally(exception)));
return future;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c2fd32a44fd..a5613d439a5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -157,7 +157,6 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@SuppressWarnings("unused")
private volatile long usageCount = 0;
-
static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
@@ -1156,72 +1155,68 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
log.debug("[{}] Checking replication status", name);
}
- return brokerService.pulsar().getConfigurationCache().policiesCache()
- .getAsync(AdminResource.path(POLICIES, name.getNamespace()))
- .thenCompose(optPolicies -> {
- if (!optPolicies.isPresent()) {
- return FutureUtil.failedFuture(
- new ServerMetadataException("Namespace not
found: " + name.getNamespace()));
- }
-
- Policies policies = optPolicies.get();
+ Policies policies = null;
+ try {
+ policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES, name.getNamespace()))
+ .orElseThrow(() -> new KeeperException.NoNodeException());
+ } catch (Exception e) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.completeExceptionally(new ServerMetadataException(e));
+ return future;
+ }
+ //Ignore current broker's config for messageTTL for replication.
+ final int newMessageTTLinSeconds;
+ try {
+ newMessageTTLinSeconds = getMessageTTL();
+ } catch (Exception e) {
+ return FutureUtil.failedFuture(new ServerMetadataException(e));
+ }
- //Ignore current broker's config for messageTTL for
replication.
- final int newMessageTTLinSeconds;
- try {
- newMessageTTLinSeconds = getMessageTTL();
- } catch (Exception e) {
- return FutureUtil.failedFuture(new
ServerMetadataException(e));
- }
+ Set<String> configuredClusters;
+ if (policies.replication_clusters != null) {
+ configuredClusters =
Sets.newTreeSet(policies.replication_clusters);
+ } else {
+ configuredClusters = Collections.emptySet();
+ }
- Set<String> configuredClusters;
- if (policies.replication_clusters != null) {
- configuredClusters =
Sets.newTreeSet(policies.replication_clusters);
- } else {
- configuredClusters = Collections.emptySet();
- }
+ String localCluster =
brokerService.pulsar().getConfiguration().getClusterName();
- String localCluster =
brokerService.pulsar().getConfiguration().getClusterName();
-
- // if local cluster is removed from global namespace
cluster-list : then delete topic forcefully
- // because pulsar
- // doesn't serve global topic without local repl-cluster
configured.
- if (TopicName.get(topic).isGlobal() &&
!configuredClusters.contains(localCluster)) {
- log.info(
- "Deleting topic [{}] because local cluster is
not part of global namespace repl list "
- + "{}",
- topic, configuredClusters);
- return deleteForcefully();
- }
+ // if local cluster is removed from global namespace cluster-list :
then delete topic forcefully because pulsar
+ // doesn't serve global topic without local repl-cluster configured.
+ if (TopicName.get(topic).isGlobal() &&
!configuredClusters.contains(localCluster)) {
+ log.info("Deleting topic [{}] because local cluster is not part of
global namespace repl list {}",
+ topic, configuredClusters);
+ return deleteForcefully();
+ }
- List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+ List<CompletableFuture<Void>> futures = Lists.newArrayList();
- // Check for missing replicators
- for (String cluster : configuredClusters) {
- if (cluster.equals(localCluster)) {
- continue;
- }
+ // Check for missing replicators
+ for (String cluster : configuredClusters) {
+ if (cluster.equals(localCluster)) {
+ continue;
+ }
- if (!replicators.containsKey(cluster)) {
- futures.add(startReplicator(cluster));
- }
- }
+ if (!replicators.containsKey(cluster)) {
+ futures.add(startReplicator(cluster));
+ }
+ }
- // Check for replicators to be stopped
- replicators.forEach((cluster, replicator) -> {
- // Update message TTL
- ((PersistentReplicator)
replicator).updateMessageTTL(newMessageTTLinSeconds);
+ // Check for replicators to be stopped
+ replicators.forEach((cluster, replicator) -> {
+ // Update message TTL
+ ((PersistentReplicator)
replicator).updateMessageTTL(newMessageTTLinSeconds);
- if (!cluster.equals(localCluster)) {
- if (!configuredClusters.contains(cluster)) {
- futures.add(removeReplicator(cluster));
- }
- }
+ if (!cluster.equals(localCluster)) {
+ if (!configuredClusters.contains(cluster)) {
+ futures.add(removeReplicator(cluster));
+ }
+ }
- });
+ });
- return FutureUtil.waitForAll(futures);
- });
+ return FutureUtil.waitForAll(futures);
}
@Override
@@ -2480,10 +2475,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
TopicName name = TopicName.get(topic);
TopicPolicies topicPolicies = getTopicPolicies(name);
Policies policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
- .getDataIfPresent(AdminResource.path(POLICIES,
name.getNamespace()));
- if (policies == null) {
- throw new KeeperException.NoNodeException();
- }
+ .get(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()))
+ .orElseThrow(() -> new KeeperException.NoNodeException());
if (topicPolicies != null && topicPolicies.isMessageTTLSet()) {
return topicPolicies.getMessageTTLInSeconds();
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 6ae0e52f961..3c73284e7ed 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -529,10 +529,8 @@ public class ProxyConnection extends PulsarHandler {
ClientConfigurationData createClientConfiguration()
throws PulsarClientException.UnsupportedAuthenticationException {
ClientConfigurationData initialConf = new ClientConfigurationData();
+ initialConf.setServiceUrl(service.getServiceUrl());
ProxyConfiguration proxyConfig = service.getConfiguration();
- initialConf.setServiceUrl(
- proxyConfig.isTlsEnabledWithBroker() ?
service.getServiceUrlTls() : service.getServiceUrl());
-
// Apply all arbitrary configuration. This must be called before
setting any fields annotated as
// @Secret on the ClientConfigurationData object because of the way
they are serialized.
// See https://github.com/apache/pulsar/issues/8509 for more
information.
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
index 8c07e4b42d7..5f533e37d35 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
@@ -18,12 +18,8 @@
*/
package org.apache.pulsar.proxy.server;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.testng.annotations.Test;
public class ProxyConnectionTest {
@@ -39,24 +35,4 @@ public class ProxyConnectionTest {
assertFalse(ProxyConnection
.matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:12345",
"1.2.3.4:1234"));
}
- @Test
- public void testCreateClientConfiguration() {
- ProxyConfiguration proxyConfiguration = new ProxyConfiguration();
- proxyConfiguration.setTlsEnabledWithBroker(true);
- String proxyUrlTls = "pulsar+ssl://proxy:6651";
- String proxyUrl = "pulsar://proxy:6650";
-
- ProxyService proxyService = mock(ProxyService.class);
- doReturn(proxyConfiguration).when(proxyService).getConfiguration();
- doReturn(proxyUrlTls).when(proxyService).getServiceUrlTls();
- doReturn(proxyUrl).when(proxyService).getServiceUrl();
-
- ProxyConnection proxyConnection = new ProxyConnection(proxyService,
null);
- ClientConfigurationData clientConfiguration =
proxyConnection.createClientConfiguration();
- assertEquals(clientConfiguration.getServiceUrl(), proxyUrlTls);
-
- proxyConfiguration.setTlsEnabledWithBroker(false);
- clientConfiguration = proxyConnection.createClientConfiguration();
- assertEquals(clientConfiguration.getServiceUrl(), proxyUrl);
- }
}
diff --git a/pulsar-sql/presto-distribution/LICENSE
b/pulsar-sql/presto-distribution/LICENSE
index 906cad72435..a52dda264ba 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -273,7 +273,7 @@ The Apache Software License, Version 2.0
* CGLIB Nodep
- cglib-nodep-3.3.0.jar
* Airlift
- - aircompressor-0.20.jar
+ - aircompressor-0.16.jar
- airline-0.8.jar
- bootstrap-0.199.jar
- bootstrap-0.195.jar