This is an automated email from the ASF dual-hosted git repository. zixuan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 7f0429c1cd8 [fix][broker] Add topic consistency check (#24118) 7f0429c1cd8 is described below commit 7f0429c1cd81fbde6e0213a84e42f1ea55b5f7d2 Author: Zixuan Liu <node...@gmail.com> AuthorDate: Wed Mar 26 18:01:20 2025 +0800 [fix][broker] Add topic consistency check (#24118) Signed-off-by: Zixuan Liu <node...@gmail.com> --- .../pulsar/broker/namespace/NamespaceService.java | 35 +++-- .../pulsar/broker/service/BrokerService.java | 150 +++++++++++---------- .../apache/pulsar/broker/admin/AdminApiTest.java | 6 +- .../pulsar/broker/admin/TopicAutoCreationTest.java | 20 +-- .../broker/protocol/PulsarClientBasedHandler.java | 7 +- .../broker/service/ExclusiveProducerTest.java | 16 ++- .../pulsar/broker/service/PersistentTopicTest.java | 8 +- .../pulsar/broker/service/ReplicatorTest.java | 6 +- .../nonpersistent/NonPersistentTopicTest.java | 20 ++- .../service/persistent/PersistentTopicTest.java | 35 +---- .../pulsar/client/api/ConsumerCreationTest.java | 127 +++++++++++++++++ .../pulsar/client/api/ProducerCreationTest.java | 73 ++++++++++ .../pulsar/client/impl/LookupServiceTest.java | 82 +++++++++++ .../pulsar/client/api/PulsarClientException.java | 4 +- 14 files changed, 433 insertions(+), 156 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index b2ee299bb03..8f5cef1bdff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1412,17 +1412,26 @@ public class NamespaceService implements AutoCloseable { * Check topic exists( partitioned or non-partitioned ). */ public CompletableFuture<TopicExistsInfo> checkTopicExists(TopicName topic) { - return pulsar.getBrokerService() - .fetchPartitionedTopicMetadataAsync(TopicName.get(topic.toString())) - .thenCompose(metadata -> { - if (metadata.partitions > 0) { - return CompletableFuture.completedFuture( - TopicExistsInfo.newPartitionedTopicExists(metadata.partitions)); - } - return checkNonPartitionedTopicExists(topic) - .thenApply(b -> b ? TopicExistsInfo.newNonPartitionedTopicExists() - : TopicExistsInfo.newTopicNotExists()); - }); + // For non-persistent/persistent partitioned topic, which has metadata. + return pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync( + topic.isPartitioned() ? TopicName.get(topic.getPartitionedTopicName()) : topic) + .thenCompose(metadata -> { + if (metadata.partitions > 0) { + if (!topic.isPartitioned()) { + return CompletableFuture.completedFuture( + TopicExistsInfo.newPartitionedTopicExists(metadata.partitions)); + } else { + if (topic.getPartitionIndex() < metadata.partitions) { + return CompletableFuture.completedFuture( + TopicExistsInfo.newNonPartitionedTopicExists()); + } + } + } + // Direct query the single topic. + return checkNonPartitionedTopicExists(topic).thenApply( + b -> b ? TopicExistsInfo.newNonPartitionedTopicExists() : + TopicExistsInfo.newTopicNotExists()); + }); } /*** @@ -1443,12 +1452,12 @@ public class NamespaceService implements AutoCloseable { */ public CompletableFuture<Boolean> checkNonPersistentNonPartitionedTopicExists(String topic) { TopicName topicName = TopicName.get(topic); - // "non-partitioned & non-persistent" topics only exist on the owner broker. + // "non-partitioned & non-persistent" topics only exist on the cache of the owner broker. return checkTopicOwnership(TopicName.get(topic)).thenCompose(isOwned -> { // The current broker is the owner. if (isOwned) { CompletableFuture<Optional<Topic>> nonPersistentTopicFuture = pulsar.getBrokerService() - .getTopic(topic, false); + .getTopics().get(topic); if (nonPersistentTopicFuture != null) { return nonPersistentTopicFuture.thenApply(Optional::isPresent); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 107bb01ffa0..dd3486496e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1061,6 +1061,70 @@ public class BrokerService implements Closeable { return getTopic(TopicName.get(topic), createIfMissing, properties); } + /** + * Validates that the topic is consistent with its partition metadata. + * + * This method ensures the topic (partitioned or non-partitioned) correctly + * matches the actual partitions in the metadata. Inconsistencies typically + * indicate configuration issues or metadata synchronization problems. + * + * This validation is particularly important in geo-replicated environments where + * topic metadata may not be fully synchronized across all regions, potentially + * leading to access errors if not properly handled. + * + * @param topicName The topic name to validate + * @return CompletableFuture that completes normally if validation passes, or + * completes exceptionally with NotAllowedException if validation fails + */ + private CompletableFuture<Void> validateTopicConsistency(TopicName topicName) { + if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { + // Skip validation for heartbeat namespace. + return CompletableFuture.completedFuture(null); + } + TopicName baseTopicName = + topicName.isPartitioned() ? TopicName.get(topicName.getPartitionedTopicName()) : topicName; + return fetchPartitionedTopicMetadataAsync(baseTopicName) + .thenCompose(metadata -> { + if (topicName.isPartitioned()) { + if (metadata.partitions == 0) { + // Edge case: When a complete partitioned topic name is provided but metadata shows 0 + // partitions. + // This indicates that the partitioned topic metadata doesn't exist. + // + // Resolution options: + // 1. Creates the partitioned topic via admin API. + // 2. Uses the base topic name and then rely on auto-creation the partitioned topic if + // enabled. + return FutureUtil.failedFuture( + new BrokerServiceException.NotAllowedException( + "Partition metadata not found for the partitioned topic: " + topicName)); + } + if (topicName.getPartitionIndex() >= metadata.partitions) { + final String errorMsg = + String.format( + "Illegal topic partition name %s with max allowed " + + "%d partitions", topicName, + metadata.partitions); + log.warn(errorMsg); + return FutureUtil.failedFuture( + new BrokerServiceException.NotAllowedException(errorMsg)); + } + } else if (metadata.partitions > 0) { + // Edge case: Non-partitioned topic name was provided, but metadata indicates this is + // actually a partitioned + // topic (partitions > 0). + // + // Resolution: Must use the complete partitioned topic name('topic-name-partition-N'). + // + // This ensures proper routing to the specific partition and prevents ambiguity in topic + // addressing. + return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException( + "Found partitioned metadata for non-partitioned topic: " + topicName)); + } + return CompletableFuture.completedFuture(null); + }); + } + /** * Retrieves or creates a topic based on the specified parameters. * 0. If disable PersistentTopics or NonPersistentTopics, it will return a failed future with NotAllowedException. @@ -1107,30 +1171,9 @@ public class BrokerService implements Closeable { throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo)); }).thenCompose(optionalTopicPolicies -> { final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null); - if (topicName.isPartitioned()) { - final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName()); - return fetchPartitionedTopicMetadataAsync(topicNameEntity) - .thenCompose((metadata) -> { - // Allow creating non-partitioned persistent topic that name includes - // `partition` - if (metadata.partitions == 0 - || topicName.getPartitionIndex() < metadata.partitions) { - return topics.computeIfAbsent(topicName.toString(), (tpName) -> - loadOrCreatePersistentTopic(tpName, - createIfMissing, properties, topicPolicies)); - } else { - final String errorMsg = - String.format("Illegal topic partition name %s with max allowed " - + "%d partitions", topicName, metadata.partitions); - log.warn(errorMsg); - return FutureUtil.failedFuture( - new BrokerServiceException.NotAllowedException(errorMsg)); - } - }); - } else { - return topics.computeIfAbsent(topicName.toString(), (tpName) -> - loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies)); - } + return topics.computeIfAbsent(topicName.toString(), + (tpName) -> loadOrCreatePersistentTopic(tpName, createIfMissing, properties, + topicPolicies)); }); }); } else { @@ -1144,29 +1187,10 @@ public class BrokerService implements Closeable { if (!topics.containsKey(topicName.toString())) { topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.BEFORE); } - if (topicName.isPartitioned()) { - final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); - return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> { - if (topicName.getPartitionIndex() < metadata.partitions) { - return topics.computeIfAbsent(topicName.toString(), (name) -> { - topicEventsDispatcher - .notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE); - - CompletableFuture<Optional<Topic>> res = createNonPersistentTopic(name); - - CompletableFuture<Optional<Topic>> eventFuture = topicEventsDispatcher - .notifyOnCompletion(res, topicName.toString(), TopicEvent.CREATE); - topicEventsDispatcher - .notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD); - return res; - }); - } - topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE); - return CompletableFuture.completedFuture(Optional.empty()); - }); - } else if (createIfMissing) { + if (topicName.isPartitioned() || createIfMissing) { return topics.computeIfAbsent(topicName.toString(), (name) -> { - topicEventsDispatcher.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE); + topicEventsDispatcher + .notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE); CompletableFuture<Optional<Topic>> res = createNonPersistentTopic(name); @@ -1176,14 +1200,13 @@ public class BrokerService implements Closeable { .notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD); return res; }); - } else { - CompletableFuture<Optional<Topic>> topicFuture = topics.get(topicName.toString()); - if (topicFuture == null) { - topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE); - topicFuture = CompletableFuture.completedFuture(Optional.empty()); - } - return topicFuture; } + CompletableFuture<Optional<Topic>> topicFuture = topics.get(topicName.toString()); + if (topicFuture == null) { + topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE); + topicFuture = CompletableFuture.completedFuture(Optional.empty()); + } + return topicFuture; } } catch (IllegalArgumentException e) { log.warn("[{}] Illegalargument exception when loading topic", topicName, e); @@ -1357,8 +1380,9 @@ public class BrokerService implements Closeable { topicFuture.completeExceptionally(e); return topicFuture; } - CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic); - isOwner.thenRun(() -> { + checkTopicNsOwnership(topic) + .thenCompose((__) -> validateTopicConsistency(TopicName.get(topic))) + .thenRun(() -> { nonPersistentTopic.initialize() .thenCompose(__ -> nonPersistentTopic.checkReplication()) .thenRun(() -> { @@ -1375,17 +1399,7 @@ public class BrokerService implements Closeable { return null; }); }).exceptionally(e -> { - log.warn("CheckTopicNsOwnership fail when createNonPersistentTopic! {}", topic, e.getCause()); - // CheckTopicNsOwnership fail dont create nonPersistentTopic, when topic do lookup will find the correct - // broker. When client get non-persistent-partitioned topic - // metadata will the non-persistent-topic will be created. - // so we should add checkTopicNsOwnership logic otherwise the topic will be created - // if it dont own by this broker,we should return success - // otherwise it will keep retrying getPartitionedTopicMetadata - topicFuture.complete(Optional.of(nonPersistentTopic)); - // after get metadata return success, we should delete this topic from this broker, because this topic not - // owner by this broker and it don't initialize and checkReplication - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + topicFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e)); return null; }); @@ -1772,8 +1786,8 @@ public class BrokerService implements Closeable { : CompletableFuture.completedFuture(null); CompletableFuture<Void> isTopicAlreadyMigrated = checkTopicAlreadyMigrated(topicName); - - maxTopicsCheck.thenCompose(__ -> isTopicAlreadyMigrated) + maxTopicsCheck.thenCompose(partitionedTopicMetadata -> validateTopicConsistency(topicName)) + .thenCompose(__ -> isTopicAlreadyMigrated) .thenCompose(__ -> getManagedLedgerConfig(topicName, topicPolicies)) .thenAccept(managedLedgerConfig -> { if (isBrokerEntryMetadataEnabled() || isBrokerPayloadProcessorEnabled()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index cea43cc9345..e9ca122bba1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -2912,10 +2912,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { @Test public void testPersistentTopicsExpireMessagesInvalidPartitionIndex() throws Exception { - // Force to create a topic - publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2-partition-2", 0); - assertEquals(admin.topics().getList("prop-xyz/ns1"), - List.of("persistent://prop-xyz/ns1/ds2-partition-2")); + // Create a topic + admin.topics().createPartitionedTopic("persistent://prop-xyz/ns1/ds2", 3); // create consumer and subscription @Cleanup diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java index c90ad15242c..e06abd972c1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java @@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import java.net.InetSocketAddress; import java.util.List; @@ -39,6 +40,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.NotAllowedException; import org.apache.pulsar.client.impl.LookupService; import org.apache.pulsar.client.impl.LookupTopicResult; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -101,16 +103,14 @@ public class TopicAutoCreationTest extends ProducerConsumerBase { final String partition = "persistent://" + namespaceName + "/test-partitioned-topi-auto-creation-partition-0"; - producer = pulsarClient.newProducer() - .topic(partition) - .create(); - - partitionedTopics = admin.topics().getPartitionedTopicList(namespaceName); - topics = admin.topics().getList(namespaceName); - assertEquals(partitionedTopics.size(), 0); - assertEquals(topics.size(), 1); - - producer.close(); + // The Pulsar doesn't automatically create the metadata for the single partition, so the producer creation + // will fail. + assertThrows(NotAllowedException.class, () -> { + @Cleanup + Producer<byte[]> ignored = pulsarClient.newProducer() + .topic(partition) + .create(); + }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java index ed9881a8cad..3d24fe3ce38 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java @@ -79,9 +79,11 @@ public class PulsarClientBasedHandler implements ProtocolHandler { @Override public void start(BrokerService service) { + @Cleanup + PulsarAdmin admin = null; try { final var port = service.getPulsar().getListenPortHTTP().orElseThrow(); - @Cleanup final var admin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:" + port).build(); + admin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:" + port).build(); try { admin.clusters().createCluster(cluster, ClusterData.builder() .serviceUrl(service.getPulsar().getWebServiceAddress()) @@ -103,6 +105,7 @@ public class PulsarClientBasedHandler implements ProtocolHandler { throw new RuntimeException(e); } try { + admin.topics().createPartitionedTopic(topic, partitions); final var port = service.getListenPort().orElseThrow(); client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + port).build(); readers = new ArrayList<>(); @@ -122,7 +125,7 @@ public class PulsarClientBasedHandler implements ProtocolHandler { }); } }); - } catch (PulsarClientException e) { + } catch (PulsarClientException | PulsarAdminException e) { throw new RuntimeException(e); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java index 5f95e557b8c..33a34d3fff4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java @@ -19,20 +19,21 @@ package org.apache.pulsar.broker.service; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - +import io.netty.util.HashedWheelTimer; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import io.netty.util.HashedWheelTimer; import lombok.Cleanup; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerAccessMode; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException; import org.apache.pulsar.client.api.PulsarClientException.ProducerFencedException; +import org.apache.pulsar.client.api.PulsarClientException.TimeoutException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicName; @@ -316,6 +317,7 @@ public class ExclusiveProducerTest extends BrokerTestBase { public void topicDeleted(String ignored, boolean partitioned) throws Exception { String topic = newTopic("persistent", partitioned); + @Cleanup Producer<String> p1 = pulsarClient.newProducer(Schema.STRING) .topic(topic) .accessMode(ProducerAccessMode.Exclusive) @@ -329,8 +331,14 @@ public class ExclusiveProducerTest extends BrokerTestBase { admin.topics().delete(topic, true); } - // The producer should be able to publish again on the topic - p1.send("msg-2"); + if (!partitioned) { + // The producer should be able to publish again on the topic + p1.send("msg-2"); + } else { + // The partitioned topic is deleted, the producer should not be able to publish again on the topic. + // Partitioned metadata is required to publish messages to the topic. + assertThrows(TimeoutException.class, () -> p1.send("msg-2")); + } } @Test(dataProvider = "topics") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 92b767104f6..0863e1ec5c4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -127,7 +127,6 @@ import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ProducerAccessMode; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl; @@ -141,7 +140,7 @@ import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.compaction.CompactorMXBean; import org.apache.pulsar.compaction.PulsarCompactionServiceFactory; import org.apache.pulsar.metadata.api.MetadataStoreException; -import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; +import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore.OperationType; import org.awaitility.Awaitility; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; @@ -1464,8 +1463,6 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { doReturn(CompletableFuture.completedFuture(null)).when(ledgerMock).asyncTruncate(); // create topic - brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources() - .createPartitionedTopic(TopicName.get(successTopicName), new PartitionedTopicMetadata(2)); PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get(); Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced"); @@ -1477,8 +1474,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { assertFalse((boolean) isClosingOrDeletingField.get(topic)); metadataStore.failConditional(new MetadataStoreException("injected error"), (op, path) -> - op == FaultInjectionMetadataStore.OperationType.PUT && - path.equals("/admin/partitioned-topics/prop/use/ns-abc/persistent/successTopic")); + op == OperationType.EXISTS && path.equals("/admin/flags/policies-readonly")); try { topic.delete().get(); fail(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index d1d7358f346..60932e09116 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -71,7 +71,7 @@ import org.apache.commons.lang3.RandomUtils; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; +import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.OpenTelemetryReplicatorStats; @@ -1208,7 +1208,7 @@ public class ReplicatorTest extends ReplicatorTestBase { if (!isPartitionedTopic) { fail("Topic creation should not fail without any partitioned topic"); } - assertTrue(e.getCause() instanceof NamingException); + assertTrue(e.getCause() instanceof NotAllowedException); } // non-persistent topic test @@ -1221,7 +1221,7 @@ public class ReplicatorTest extends ReplicatorTestBase { if (!isPartitionedTopic) { fail("Topic creation should not fail without any partitioned topic"); } - assertTrue(e.getCause() instanceof NamingException); + assertTrue(e.getCause() instanceof NotAllowedException); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java index e0d6a432bda..a654b477b45 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java @@ -18,6 +18,10 @@ */ package org.apache.pulsar.broker.service.nonpersistent; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; import java.util.Optional; import java.util.UUID; @@ -43,11 +47,6 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.fail; - @Test(groups = "broker") public class NonPersistentTopicTest extends BrokerTestBase { @@ -113,19 +112,16 @@ public class NonPersistentTopicTest extends BrokerTestBase { } @Test - public void testCreateNonExistentPartitions() throws PulsarAdminException, PulsarClientException { + public void testCreateNonExistentPartitions() throws PulsarAdminException { final String topicName = "non-persistent://prop/ns-abc/testCreateNonExistentPartitions"; admin.topics().createPartitionedTopic(topicName, 4); TopicName partition = TopicName.get(topicName).getPartition(4); - try { + assertThrows(PulsarClientException.NotAllowedException.class, () -> { @Cleanup - Producer<byte[]> producer = pulsarClient.newProducer() + Producer<byte[]> ignored = pulsarClient.newProducer() .topic(partition.toString()) .create(); - fail("unexpected behaviour"); - } catch (PulsarClientException.TopicDoesNotExistException ignored) { - - } + }); assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 4); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 3e74ab5e1ac..12b9b0568b7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.broker.service.persistent; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -36,6 +34,8 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; import java.io.ByteArrayOutputStream; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; @@ -578,37 +578,6 @@ public class PersistentTopicTest extends BrokerTestBase { Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 4); } - @Test - public void testCompatibilityWithPartitionKeyword() throws PulsarAdminException, PulsarClientException { - final String topicName = "persistent://prop/ns-abc/testCompatibilityWithPartitionKeyword"; - TopicName topicNameEntity = TopicName.get(topicName); - String partition2 = topicNameEntity.getPartition(2).toString(); - // Create a non-partitioned topic with -partition- keyword - Producer<byte[]> producer = pulsarClient.newProducer() - .topic(partition2) - .create(); - List<String> topics = admin.topics().getList("prop/ns-abc"); - // Close previous producer to simulate reconnect - producer.close(); - // Disable auto topic creation - conf.setAllowAutoTopicCreation(false); - // Check the topic exist in the list. - Assert.assertTrue(topics.contains(partition2)); - // Check this topic has no partition metadata. - Assert.assertThrows(PulsarAdminException.NotFoundException.class, - () -> admin.topics().getPartitionedTopicMetadata(topicName)); - // Reconnect to the broker and expect successful because the topic has existed in the broker. - producer = pulsarClient.newProducer() - .topic(partition2) - .create(); - producer.close(); - // Check the topic exist in the list again. - Assert.assertTrue(topics.contains(partition2)); - // Check this topic has no partition metadata again. - Assert.assertThrows(PulsarAdminException.NotFoundException.class, - () -> admin.topics().getPartitionedTopicMetadata(topicName)); - } - @Test public void testDeleteTopicFail() throws Exception { final String fullyTopicName = "persistent://prop/ns-abc/" + "tp_" diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java new file mode 100644 index 00000000000..a81dbe02b34 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.client.api; + +import static org.testng.Assert.assertThrows; +import lombok.Cleanup; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.PulsarClientException.NotAllowedException; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Test(groups = "broker-api") +public class ConsumerCreationTest extends ProducerConsumerBase { + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @DataProvider(name = "topicDomainProvider") + public Object[][] topicDomainProvider() { + return new Object[][]{ + {TopicDomain.persistent}, + {TopicDomain.non_persistent} + }; + } + + @Test(dataProvider = "topicDomainProvider") + public void testCreateConsumerWhenTopicTypeMismatch(TopicDomain domain) + throws PulsarAdminException, PulsarClientException { + String nonPartitionedTopic = + TopicName.get(domain.value(), "public", "default", + "testCreateConsumerWhenTopicTypeMismatch-nonPartitionedTopic") + .toString(); + admin.topics().createNonPartitionedTopic(nonPartitionedTopic); + + // Topic type is non-partitioned, Trying to create consumer on partitioned topic. + assertThrows(NotAllowedException.class, () -> { + @Cleanup + Consumer<byte[]> ignored = + pulsarClient.newConsumer().topic(TopicName.get(nonPartitionedTopic).getPartition(2).toString()) + .subscriptionName("my-sub").subscribe(); + }); + + // Topic type is partitioned, Trying to create consumer on non-partitioned topic. + String partitionedTopic = TopicName.get(domain.value(), "public", "default", + "testCreateConsumerWhenTopicTypeMismatch-partitionedTopic") + .toString(); + admin.topics().createPartitionedTopic(partitionedTopic, 3); + + // Works fine because the lookup can help our to find the correct topic. + { + @Cleanup + Consumer<byte[]> ignored = + pulsarClient.newConsumer().topic(TopicName.get(partitionedTopic).getPartition(2).toString()) + .subscriptionName("my-sub").subscribe(); + } + + // Partition index is out of range. + assertThrows(NotAllowedException.class, () -> { + @Cleanup + Consumer<byte[]> ignored = + pulsarClient.newConsumer().topic(TopicName.get(partitionedTopic).getPartition(100).toString()) + .subscriptionName("my-sub").subscribe(); + }); + } + + @Test(dataProvider = "topicDomainProvider") + public void testCreateConsumerWhenSinglePartitionIsDeleted(TopicDomain domain) + throws PulsarAdminException, PulsarClientException { + testCreateConsumerWhenSinglePartitionIsDeleted(domain, false); + testCreateConsumerWhenSinglePartitionIsDeleted(domain, true); + } + + private void testCreateConsumerWhenSinglePartitionIsDeleted(TopicDomain domain, boolean allowAutoTopicCreation) + throws PulsarAdminException, PulsarClientException { + conf.setAllowAutoTopicCreation(allowAutoTopicCreation); + + String partitionedTopic = TopicName.get(domain.value(), "public", "default", + "testCreateConsumerWhenSinglePartitionIsDeleted-" + allowAutoTopicCreation) + .toString(); + admin.topics().createPartitionedTopic(partitionedTopic, 3); + admin.topics().delete(TopicName.get(partitionedTopic).getPartition(1).toString()); + + // Non-persistent topic only have the metadata, and no partition, so it works fine. + if (allowAutoTopicCreation || domain.equals(TopicDomain.non_persistent)) { + @Cleanup + Consumer<byte[]> ignored = + pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe(); + } else { + assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> { + @Cleanup + Consumer<byte[]> ignored = + pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe(); + }); + } + } +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java index d5734588288..cd75383a487 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java @@ -18,8 +18,11 @@ */ package org.apache.pulsar.client.api; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.fail; +import lombok.Cleanup; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.PulsarClientException.NotAllowedException; import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.common.naming.TopicDomain; @@ -191,4 +194,74 @@ public class ProducerCreationTest extends ProducerConsumerBase { Assert.assertFalse(admin.topics().getSubscriptions(topic.toString()).contains(initialSubscriptionName)); } + + @Test(dataProvider = "topicDomainProvider") + public void testCreateProducerWhenTopicTypeMismatch(TopicDomain domain) + throws PulsarAdminException, PulsarClientException { + String nonPartitionedTopic = + TopicName.get(domain.value(), "public", "default", + "testCreateProducerWhenTopicTypeMismatch-nonPartitionedTopic") + .toString(); + admin.topics().createNonPartitionedTopic(nonPartitionedTopic); + + // Topic type is non-partitioned, trying to create producer on the complete partitioned topic. + // Should throw NotAllowedException. + assertThrows(NotAllowedException.class, () -> { + @Cleanup + Producer<byte[]> ignored = + pulsarClient.newProducer().topic(TopicName.get(nonPartitionedTopic).getPartition(2).toString()) + .create(); + }); + + // Topic type is partitioned, trying to create producer on the base partitioned topic. + String partitionedTopic = TopicName.get(domain.value(), "public", "default", + "testCreateProducerWhenTopicTypeMismatch-partitionedTopic") + .toString(); + admin.topics().createPartitionedTopic(partitionedTopic, 3); + + // Works fine because the lookup can help our to find all the topics. + { + @Cleanup + Producer<byte[]> ignored = + pulsarClient.newProducer().topic(TopicName.get(partitionedTopic).getPartitionedTopicName()) + .create(); + } + + // Partition index is out of range. + assertThrows(NotAllowedException.class, () -> { + @Cleanup + Producer<byte[]> ignored = + pulsarClient.newProducer().topic(TopicName.get(partitionedTopic).getPartition(100).toString()) + .create(); + }); + } + + @Test(dataProvider = "topicDomainProvider") + public void testCreateProducerWhenSinglePartitionIsDeleted(TopicDomain domain) + throws PulsarAdminException, PulsarClientException { + testCreateProducerWhenSinglePartitionIsDeleted(domain, false); + testCreateProducerWhenSinglePartitionIsDeleted(domain, true); + } + + private void testCreateProducerWhenSinglePartitionIsDeleted(TopicDomain domain, boolean allowAutoTopicCreation) + throws PulsarAdminException, PulsarClientException { + conf.setAllowAutoTopicCreation(allowAutoTopicCreation); + + String partitionedTopic = TopicName.get(domain.value(), "public", "default", + "testCreateProducerWhenSinglePartitionIsDeleted-" + allowAutoTopicCreation) + .toString(); + admin.topics().createPartitionedTopic(partitionedTopic, 3); + admin.topics().delete(TopicName.get(partitionedTopic).getPartition(1).toString()); + + // Non-persistent topic only have the metadata, and no partition, so it works fine. + if (allowAutoTopicCreation || domain == TopicDomain.non_persistent) { + @Cleanup + Producer<byte[]> ignored = pulsarClient.newProducer().topic(partitionedTopic).create(); + } else { + assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> { + @Cleanup + Producer<byte[]> ignored = pulsarClient.newProducer().topic(partitionedTopic).create(); + }); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java index 59cb7ae03d0..c4ef53b292b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java @@ -19,13 +19,19 @@ package org.apache.pulsar.client.impl; import static org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import java.util.Collection; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.testng.annotations.AfterClass; @@ -125,4 +131,80 @@ public class LookupServiceTest extends ProducerConsumerBase { admin.topics().delete(nonPartitionedTopic, false); } + @Test(dataProvider = "isUsingHttpLookup") + public void testGetPartitionedTopicMetadataByPulsarClient(boolean isUsingHttpLookup) throws PulsarAdminException { + LookupService lookupService = getLookupService(isUsingHttpLookup); + + // metadataAutoCreationEnabled is true. + assertThat(lookupService.getPartitionedTopicMetadata( + TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp")), true)) + .succeedsWithin(3, TimeUnit.SECONDS) + .matches(n -> n.partitions == 0); + + // metadataAutoCreationEnabled is true. + // Allow the get the metadata of single partition topic, because the auto-creation is enabled. + // But the producer/consumer is unavailable because the topic doesn't have the metadata. + assertThat(lookupService.getPartitionedTopicMetadata( + TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp") + "-partition-10"), + true)) + .succeedsWithin(3, TimeUnit.SECONDS) + .matches(n -> n.partitions == 0); + + Class<? extends Throwable> expectedExceptionClass = + isUsingHttpLookup ? PulsarClientException.NotFoundException.class : + PulsarClientException.TopicDoesNotExistException.class; + // metadataAutoCreationEnabled is false. + assertThat(lookupService.getPartitionedTopicMetadata( + TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp")), false)) + .failsWithin(3, TimeUnit.SECONDS) + .withThrowableThat() + .withCauseInstanceOf(expectedExceptionClass); + + // metadataAutoCreationEnabled is false. + assertThat(lookupService.getPartitionedTopicMetadata( + TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp") + "-partition-10"), + false)) + .failsWithin(3, TimeUnit.SECONDS) + .withThrowableThat() + .withCauseInstanceOf(expectedExceptionClass); + + // Verify the topic exists, and the metadataAutoCreationEnabled is false. + String nonPartitionedTopic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createNonPartitionedTopic(nonPartitionedTopic); + assertThat(lookupService.getPartitionedTopicMetadata(TopicName.get(nonPartitionedTopic), false)) + .succeedsWithin(3, TimeUnit.SECONDS) + .matches(n -> n.partitions == 0); + + String partitionedTopic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + String partitionedTopicWithPartitionIndex = partitionedTopic + "-partition-10"; + admin.topics().createPartitionedTopic(partitionedTopic, 20); + assertThat(lookupService.getPartitionedTopicMetadata(TopicName.get(partitionedTopic), false)) + .succeedsWithin(3, TimeUnit.SECONDS) + .matches(n -> n.partitions == 20); + assertThat(lookupService.getPartitionedTopicMetadata(TopicName.get(partitionedTopicWithPartitionIndex), false)) + .succeedsWithin(3, TimeUnit.SECONDS) + .matches(n -> n.partitions == 0); + } + + @Test + public void testGetPartitionedTopicMedataByAdmin() throws PulsarAdminException { + String nonPartitionedTopic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + String partitionedTopic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + String partitionedTopicWithPartitionIndex = partitionedTopic + "-partition-10"; + // No topic, so throw the NotFound. + // BTW: The admin api doesn't allow to creat the metadata of topic default. + assertThrows(PulsarAdminException.NotFoundException.class, () -> admin.topics() + .getPartitionedTopicMetadata(nonPartitionedTopic)); + assertThrows(PulsarAdminException.NotFoundException.class, () -> admin.topics() + .getPartitionedTopicMetadata(partitionedTopic)); + assertThrows(PulsarAdminException.NotFoundException.class, + () -> admin.topics().getPartitionedTopicMetadata(partitionedTopicWithPartitionIndex)); + + admin.topics().createNonPartitionedTopic(nonPartitionedTopic); + assertEquals(admin.topics().getPartitionedTopicMetadata(nonPartitionedTopic).partitions, 0); + + admin.topics().createPartitionedTopic(partitionedTopic, 20); + assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopic).partitions, 20); + assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicWithPartitionIndex).partitions, 0); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index b2c9b2b697b..32cec950a69 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -961,7 +961,9 @@ public class PulsarClientException extends IOException { public static Throwable wrap(Throwable t, String msg) { msg += "\n" + t.getMessage(); // wrap an exception with new message info - if (t instanceof TimeoutException) { + if (t instanceof TopicDoesNotExistException) { + return new TopicDoesNotExistException(msg); + } else if (t instanceof TimeoutException) { return new TimeoutException(msg); } else if (t instanceof InvalidConfigurationException) { return new InvalidConfigurationException(msg);