codelipenghui commented on code in PR #25443:
URL: https://github.com/apache/pulsar/pull/25443#discussion_r3192488352
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -4136,6 +4153,38 @@ public void
setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory
this.pulsarChannelInitFactory = factory;
}
+ /**
+ * @return Triple [namespace policies, global topic policies, topic
policies].
+ */
+ public CompletableFuture<Boolean> isCurrentClusterAllowed(@NonNull
TopicName topicName) {
+ final String cluster = getPulsar().getConfig().getClusterName();
+ return getCombinedTopicPolicies(topicName).thenApply(triple -> {
+ Optional<TopicPolicies> topicP = triple.getRight();
+ Optional<TopicPolicies> globalTopicP = triple.getMiddle();
+ Optional<Policies> nsPolicies = triple.getLeft();
+ // Disabled a cluster for a namespace manually.
+ if (nsPolicies.isPresent() &&
!isCurrentClusterAllowed(topicName.getNamespaceObject(), nsPolicies.get())) {
Review Comment:
Possible regression: `isCurrentClusterAllowed(NamespaceName, Policies)`
returns false when namespace `replication_clusters` excludes the current
cluster (with empty `allowed_clusters`). But topic-level `replicationClusters`
is supposed to override namespace-level defaults — e.g. ns
`replication_clusters=[c1]`, topic `replicationClusters=[c1,c2]` should be
allowed on `c2`, but this short-circuit returns false.
Suggest only short-circuiting on the `allowed_clusters` hard gate (PIP-321),
then deferring to topic-level checks before falling back to ns
`replication_clusters`.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java:
##########
@@ -149,6 +149,15 @@ public void testDeleteRemoteTopicByGlobalPolicy() throws
Exception {
});
waitReplicatorStopped(subTopic, pulsar1, pulsar2, true);
+ try {
+ admin2.topics().createMissedPartitions(topicName);
Review Comment:
Consider extracting these assertions into their own `@Test` method.
Only the global-topic-policy branch is exercised — adding cases for the
local-topic-level and namespace `allowed_clusters` branches plus a positive
happy-path assertion would catch inverted-boolean regressions.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -4136,6 +4153,38 @@ public void
setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory
this.pulsarChannelInitFactory = factory;
}
+ /**
+ * @return Triple [namespace policies, global topic policies, topic
policies].
+ */
+ public CompletableFuture<Boolean> isCurrentClusterAllowed(@NonNull
TopicName topicName) {
+ final String cluster = getPulsar().getConfig().getClusterName();
+ return getCombinedTopicPolicies(topicName).thenApply(triple -> {
+ Optional<TopicPolicies> topicP = triple.getRight();
+ Optional<TopicPolicies> globalTopicP = triple.getMiddle();
+ Optional<Policies> nsPolicies = triple.getLeft();
+ // Disabled a cluster for a namespace manually.
+ if (nsPolicies.isPresent() &&
!isCurrentClusterAllowed(topicName.getNamespaceObject(), nsPolicies.get())) {
+ return false;
+ }
+ // Manually enabled topic-level replication, which can skip to set
a namespace-level replication.
+ if (topicP.isPresent() &&
CollectionUtils.isNotEmpty(topicP.get().getReplicationClusters())) {
+ if (topicP.get().getReplicationClusters().contains(cluster)) {
+ return true;
+ } else {
+ return false;
+ }
Review Comment:
`if (x) return true; else return false;` → `return
topicP.get().getReplicationClusters().contains(cluster);` (same simplification
applies to the `globalTopicP` block below).
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java:
##########
@@ -149,6 +149,15 @@ public void testDeleteRemoteTopicByGlobalPolicy() throws
Exception {
});
waitReplicatorStopped(subTopic, pulsar1, pulsar2, true);
+ try {
+ admin2.topics().createMissedPartitions(topicName);
+ fail("The action that creates mission partitions should have
thrown exception");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("is not allowed to be loaded
up"));
Review Comment:
This couples the test to the English error string (and to the `polices`
typo). Prefer catching `PulsarAdminException` and asserting `getStatusCode() ==
400`.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -4136,6 +4153,38 @@ public void
setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory
this.pulsarChannelInitFactory = factory;
}
+ /**
+ * @return Triple [namespace policies, global topic policies, topic
policies].
Review Comment:
Stale copy-paste from `getCombinedTopicPolicies` — this method returns
`CompletableFuture<Boolean>`, not a `Triple`.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -550,30 +551,45 @@ private CompletableFuture<Set<String>>
getReplicationClusters() {
}
protected void internalCreateMissedPartitions(AsyncResponse asyncResponse)
{
- getPartitionedTopicMetadataAsync(topicName, false,
false).thenAccept(metadata -> {
- if (metadata != null && metadata.partitions > 0) {
- validateNamespaceOperationAsync(topicName.getNamespaceObject(),
- NamespaceOperation.CREATE_TOPIC)
- .thenCompose(__ ->
tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> {
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(e -> {
- log.error()
- .attr("topic", topicName)
- .log("Failed to create partitions for topic");
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return null;
- });
- } else {
- throw new RestException(Status.NOT_FOUND, String.format("Topic
%s does not exist", topicName));
- }
- }).exceptionally(ex -> {
+ Consumer<Throwable> errorHandler = ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error()
.attr("topic", topicName)
.log("Failed to create partitions for topic");
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
+ };
+
pulsar().getBrokerService().isCurrentClusterAllowed(topicName).thenAccept(allowed
-> {
+ if (!allowed) {
+ resumeAsyncResponseExceptionally(asyncResponse,
+ new RestException(Status.BAD_REQUEST, String.format("Topic
[%s] is not allowed to be loaded"
Review Comment:
Two nits: typo `polices` → `policies`; and "loaded up" misdescribes the
action — this is `createMissedPartitions`, not topic loading. Including the
current cluster name in the message would help operators identify which cluster
rejected the request.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]