This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9eee8e235cb5588c1784908f4bc5ba175ef69fd4 Author: Lari Hotari <lhot...@apache.org> AuthorDate: Wed Apr 23 13:52:27 2025 +0300 Revert "[fix][broker] Add topic consistency check (#24118)" This reverts commit 61ab666cdb79ce3ca21fe1d7666bbd1ab5cf8670. --- .../pulsar/broker/namespace/NamespaceService.java | 35 ++--- .../pulsar/broker/service/BrokerService.java | 150 ++++++++++----------- .../apache/pulsar/broker/admin/AdminApiTest.java | 6 +- .../pulsar/broker/admin/TopicAutoCreationTest.java | 20 +-- .../broker/protocol/PulsarClientBasedHandler.java | 7 +- .../broker/service/ExclusiveProducerTest.java | 16 +-- .../pulsar/broker/service/PersistentTopicTest.java | 8 +- .../pulsar/broker/service/ReplicatorTest.java | 6 +- .../nonpersistent/NonPersistentTopicTest.java | 20 +-- .../service/persistent/PersistentTopicTest.java | 35 ++++- .../pulsar/client/api/ConsumerCreationTest.java | 127 ----------------- .../pulsar/client/api/ProducerCreationTest.java | 73 ---------- .../pulsar/client/impl/LookupServiceTest.java | 82 ----------- .../pulsar/client/api/PulsarClientException.java | 4 +- 14 files changed, 156 insertions(+), 433 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 5320a50e10f..f3fb17c02fc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1421,26 +1421,17 @@ public class NamespaceService implements AutoCloseable { * Check topic exists( partitioned or non-partitioned ). */ public CompletableFuture<TopicExistsInfo> checkTopicExists(TopicName topic) { - // For non-persistent/persistent partitioned topic, which has metadata. - return pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync( - topic.isPartitioned() ? TopicName.get(topic.getPartitionedTopicName()) : topic) - .thenCompose(metadata -> { - if (metadata.partitions > 0) { - if (!topic.isPartitioned()) { - return CompletableFuture.completedFuture( - TopicExistsInfo.newPartitionedTopicExists(metadata.partitions)); - } else { - if (topic.getPartitionIndex() < metadata.partitions) { - return CompletableFuture.completedFuture( - TopicExistsInfo.newNonPartitionedTopicExists()); - } - } - } - // Direct query the single topic. - return checkNonPartitionedTopicExists(topic).thenApply( - b -> b ? TopicExistsInfo.newNonPartitionedTopicExists() : - TopicExistsInfo.newTopicNotExists()); - }); + return pulsar.getBrokerService() + .fetchPartitionedTopicMetadataAsync(TopicName.get(topic.toString())) + .thenCompose(metadata -> { + if (metadata.partitions > 0) { + return CompletableFuture.completedFuture( + TopicExistsInfo.newPartitionedTopicExists(metadata.partitions)); + } + return checkNonPartitionedTopicExists(topic) + .thenApply(b -> b ? TopicExistsInfo.newNonPartitionedTopicExists() + : TopicExistsInfo.newTopicNotExists()); + }); } /*** @@ -1461,12 +1452,12 @@ public class NamespaceService implements AutoCloseable { */ public CompletableFuture<Boolean> checkNonPersistentNonPartitionedTopicExists(String topic) { TopicName topicName = TopicName.get(topic); - // "non-partitioned & non-persistent" topics only exist on the cache of the owner broker. + // "non-partitioned & non-persistent" topics only exist on the owner broker. return checkTopicOwnership(TopicName.get(topic)).thenCompose(isOwned -> { // The current broker is the owner. if (isOwned) { CompletableFuture<Optional<Topic>> nonPersistentTopicFuture = pulsar.getBrokerService() - .getTopics().get(topic); + .getTopic(topic, false); if (nonPersistentTopicFuture != null) { return nonPersistentTopicFuture.thenApply(Optional::isPresent); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 79b214def01..766af721a6b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1040,70 +1040,6 @@ public class BrokerService implements Closeable { return getTopic(TopicName.get(topic), createIfMissing, properties); } - /** - * Validates that the topic is consistent with its partition metadata. - * - * This method ensures the topic (partitioned or non-partitioned) correctly - * matches the actual partitions in the metadata. Inconsistencies typically - * indicate configuration issues or metadata synchronization problems. - * - * This validation is particularly important in geo-replicated environments where - * topic metadata may not be fully synchronized across all regions, potentially - * leading to access errors if not properly handled. - * - * @param topicName The topic name to validate - * @return CompletableFuture that completes normally if validation passes, or - * completes exceptionally with NotAllowedException if validation fails - */ - private CompletableFuture<Void> validateTopicConsistency(TopicName topicName) { - if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { - // Skip validation for heartbeat namespace. - return CompletableFuture.completedFuture(null); - } - TopicName baseTopicName = - topicName.isPartitioned() ? TopicName.get(topicName.getPartitionedTopicName()) : topicName; - return fetchPartitionedTopicMetadataAsync(baseTopicName) - .thenCompose(metadata -> { - if (topicName.isPartitioned()) { - if (metadata.partitions == 0) { - // Edge case: When a complete partitioned topic name is provided but metadata shows 0 - // partitions. - // This indicates that the partitioned topic metadata doesn't exist. - // - // Resolution options: - // 1. Creates the partitioned topic via admin API. - // 2. Uses the base topic name and then rely on auto-creation the partitioned topic if - // enabled. - return FutureUtil.failedFuture( - new BrokerServiceException.NotAllowedException( - "Partition metadata not found for the partitioned topic: " + topicName)); - } - if (topicName.getPartitionIndex() >= metadata.partitions) { - final String errorMsg = - String.format( - "Illegal topic partition name %s with max allowed " - + "%d partitions", topicName, - metadata.partitions); - log.warn(errorMsg); - return FutureUtil.failedFuture( - new BrokerServiceException.NotAllowedException(errorMsg)); - } - } else if (metadata.partitions > 0) { - // Edge case: Non-partitioned topic name was provided, but metadata indicates this is - // actually a partitioned - // topic (partitions > 0). - // - // Resolution: Must use the complete partitioned topic name('topic-name-partition-N'). - // - // This ensures proper routing to the specific partition and prevents ambiguity in topic - // addressing. - return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException( - "Found partitioned metadata for non-partitioned topic: " + topicName)); - } - return CompletableFuture.completedFuture(null); - }); - } - /** * Retrieves or creates a topic based on the specified parameters. * 0. If disable PersistentTopics or NonPersistentTopics, it will return a failed future with NotAllowedException. @@ -1150,9 +1086,30 @@ public class BrokerService implements Closeable { throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo)); }).thenCompose(optionalTopicPolicies -> { final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null); - return topics.computeIfAbsent(topicName.toString(), - (tpName) -> loadOrCreatePersistentTopic(tpName, createIfMissing, properties, - topicPolicies)); + if (topicName.isPartitioned()) { + final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName()); + return fetchPartitionedTopicMetadataAsync(topicNameEntity) + .thenCompose((metadata) -> { + // Allow creating non-partitioned persistent topic that name includes + // `partition` + if (metadata.partitions == 0 + || topicName.getPartitionIndex() < metadata.partitions) { + return topics.computeIfAbsent(topicName.toString(), (tpName) -> + loadOrCreatePersistentTopic(tpName, + createIfMissing, properties, topicPolicies)); + } else { + final String errorMsg = + String.format("Illegal topic partition name %s with max allowed " + + "%d partitions", topicName, metadata.partitions); + log.warn(errorMsg); + return FutureUtil.failedFuture( + new BrokerServiceException.NotAllowedException(errorMsg)); + } + }); + } else { + return topics.computeIfAbsent(topicName.toString(), (tpName) -> + loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies)); + } }); }); } else { @@ -1166,10 +1123,29 @@ public class BrokerService implements Closeable { if (!topics.containsKey(topicName.toString())) { topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.BEFORE); } - if (topicName.isPartitioned() || createIfMissing) { + if (topicName.isPartitioned()) { + final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); + return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> { + if (topicName.getPartitionIndex() < metadata.partitions) { + return topics.computeIfAbsent(topicName.toString(), (name) -> { + topicEventsDispatcher + .notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE); + + CompletableFuture<Optional<Topic>> res = createNonPersistentTopic(name); + + CompletableFuture<Optional<Topic>> eventFuture = topicEventsDispatcher + .notifyOnCompletion(res, topicName.toString(), TopicEvent.CREATE); + topicEventsDispatcher + .notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD); + return res; + }); + } + topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE); + return CompletableFuture.completedFuture(Optional.empty()); + }); + } else if (createIfMissing) { return topics.computeIfAbsent(topicName.toString(), (name) -> { - topicEventsDispatcher - .notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE); + topicEventsDispatcher.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE); CompletableFuture<Optional<Topic>> res = createNonPersistentTopic(name); @@ -1179,13 +1155,14 @@ public class BrokerService implements Closeable { .notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD); return res; }); + } else { + CompletableFuture<Optional<Topic>> topicFuture = topics.get(topicName.toString()); + if (topicFuture == null) { + topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE); + topicFuture = CompletableFuture.completedFuture(Optional.empty()); + } + return topicFuture; } - CompletableFuture<Optional<Topic>> topicFuture = topics.get(topicName.toString()); - if (topicFuture == null) { - topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE); - topicFuture = CompletableFuture.completedFuture(Optional.empty()); - } - return topicFuture; } } catch (IllegalArgumentException e) { log.warn("[{}] Illegalargument exception when loading topic", topicName, e); @@ -1341,9 +1318,8 @@ public class BrokerService implements Closeable { topicFuture.completeExceptionally(e); return topicFuture; } - checkTopicNsOwnership(topic) - .thenCompose((__) -> validateTopicConsistency(TopicName.get(topic))) - .thenRun(() -> { + CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic); + isOwner.thenRun(() -> { nonPersistentTopic.initialize() .thenCompose(__ -> nonPersistentTopic.checkReplication()) .thenRun(() -> { @@ -1360,7 +1336,17 @@ public class BrokerService implements Closeable { return null; }); }).exceptionally(e -> { - topicFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e)); + log.warn("CheckTopicNsOwnership fail when createNonPersistentTopic! {}", topic, e.getCause()); + // CheckTopicNsOwnership fail dont create nonPersistentTopic, when topic do lookup will find the correct + // broker. When client get non-persistent-partitioned topic + // metadata will the non-persistent-topic will be created. + // so we should add checkTopicNsOwnership logic otherwise the topic will be created + // if it dont own by this broker,we should return success + // otherwise it will keep retrying getPartitionedTopicMetadata + topicFuture.complete(Optional.of(nonPersistentTopic)); + // after get metadata return success, we should delete this topic from this broker, because this topic not + // owner by this broker and it don't initialize and checkReplication + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); return null; }); @@ -1728,8 +1714,8 @@ public class BrokerService implements Closeable { : CompletableFuture.completedFuture(null); CompletableFuture<Void> isTopicAlreadyMigrated = checkTopicAlreadyMigrated(topicName); - maxTopicsCheck.thenCompose(partitionedTopicMetadata -> validateTopicConsistency(topicName)) - .thenCompose(__ -> isTopicAlreadyMigrated) + + maxTopicsCheck.thenCompose(__ -> isTopicAlreadyMigrated) .thenCompose(__ -> getManagedLedgerConfig(topicName, topicPolicies)) .thenAccept(managedLedgerConfig -> { if (isBrokerEntryMetadataEnabled() || isBrokerPayloadProcessorEnabled()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 4ea7105dba1..9ebeaeb7853 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -2914,8 +2914,10 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { @Test public void testPersistentTopicsExpireMessagesInvalidPartitionIndex() throws Exception { - // Create a topic - admin.topics().createPartitionedTopic("persistent://prop-xyz/ns1/ds2", 3); + // Force to create a topic + publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2-partition-2", 0); + assertEquals(admin.topics().getList("prop-xyz/ns1"), + List.of("persistent://prop-xyz/ns1/ds2-partition-2")); // create consumer and subscription @Cleanup diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java index e06abd972c1..c90ad15242c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java @@ -23,7 +23,6 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import java.net.InetSocketAddress; import java.util.List; @@ -40,7 +39,6 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.PulsarClientException.NotAllowedException; import org.apache.pulsar.client.impl.LookupService; import org.apache.pulsar.client.impl.LookupTopicResult; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -103,14 +101,16 @@ public class TopicAutoCreationTest extends ProducerConsumerBase { final String partition = "persistent://" + namespaceName + "/test-partitioned-topi-auto-creation-partition-0"; - // The Pulsar doesn't automatically create the metadata for the single partition, so the producer creation - // will fail. - assertThrows(NotAllowedException.class, () -> { - @Cleanup - Producer<byte[]> ignored = pulsarClient.newProducer() - .topic(partition) - .create(); - }); + producer = pulsarClient.newProducer() + .topic(partition) + .create(); + + partitionedTopics = admin.topics().getPartitionedTopicList(namespaceName); + topics = admin.topics().getList(namespaceName); + assertEquals(partitionedTopics.size(), 0); + assertEquals(topics.size(), 1); + + producer.close(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java index 3d24fe3ce38..ed9881a8cad 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java @@ -79,11 +79,9 @@ public class PulsarClientBasedHandler implements ProtocolHandler { @Override public void start(BrokerService service) { - @Cleanup - PulsarAdmin admin = null; try { final var port = service.getPulsar().getListenPortHTTP().orElseThrow(); - admin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:" + port).build(); + @Cleanup final var admin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:" + port).build(); try { admin.clusters().createCluster(cluster, ClusterData.builder() .serviceUrl(service.getPulsar().getWebServiceAddress()) @@ -105,7 +103,6 @@ public class PulsarClientBasedHandler implements ProtocolHandler { throw new RuntimeException(e); } try { - admin.topics().createPartitionedTopic(topic, partitions); final var port = service.getListenPort().orElseThrow(); client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + port).build(); readers = new ArrayList<>(); @@ -125,7 +122,7 @@ public class PulsarClientBasedHandler implements ProtocolHandler { }); } }); - } catch (PulsarClientException | PulsarAdminException e) { + } catch (PulsarClientException e) { throw new RuntimeException(e); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java index 33a34d3fff4..5f95e557b8c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java @@ -19,21 +19,20 @@ package org.apache.pulsar.broker.service; import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; -import io.netty.util.HashedWheelTimer; + import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import io.netty.util.HashedWheelTimer; import lombok.Cleanup; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerAccessMode; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException; import org.apache.pulsar.client.api.PulsarClientException.ProducerFencedException; -import org.apache.pulsar.client.api.PulsarClientException.TimeoutException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicName; @@ -317,7 +316,6 @@ public class ExclusiveProducerTest extends BrokerTestBase { public void topicDeleted(String ignored, boolean partitioned) throws Exception { String topic = newTopic("persistent", partitioned); - @Cleanup Producer<String> p1 = pulsarClient.newProducer(Schema.STRING) .topic(topic) .accessMode(ProducerAccessMode.Exclusive) @@ -331,14 +329,8 @@ public class ExclusiveProducerTest extends BrokerTestBase { admin.topics().delete(topic, true); } - if (!partitioned) { - // The producer should be able to publish again on the topic - p1.send("msg-2"); - } else { - // The partitioned topic is deleted, the producer should not be able to publish again on the topic. - // Partitioned metadata is required to publish messages to the topic. - assertThrows(TimeoutException.class, () -> p1.send("msg-2")); - } + // The producer should be able to publish again on the topic + p1.send("msg-2"); } @Test(dataProvider = "topics") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index d64e36718b9..c18c2c5c5d6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -121,6 +121,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ProducerAccessMode; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl; @@ -135,7 +136,7 @@ import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.compaction.CompactorMXBean; import org.apache.pulsar.compaction.PulsarCompactionServiceFactory; import org.apache.pulsar.metadata.api.MetadataStoreException; -import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore.OperationType; +import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; import org.awaitility.Awaitility; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; @@ -1468,6 +1469,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { doReturn(CompletableFuture.completedFuture(null)).when(ledgerMock).asyncTruncate(); // create topic + brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .createPartitionedTopic(TopicName.get(successTopicName), new PartitionedTopicMetadata(2)); PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get(); Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced"); @@ -1479,7 +1482,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { assertFalse((boolean) isClosingOrDeletingField.get(topic)); metadataStore.failConditional(new MetadataStoreException("injected error"), (op, path) -> - op == OperationType.EXISTS && path.equals("/admin/flags/policies-readonly")); + op == FaultInjectionMetadataStore.OperationType.PUT && + path.equals("/admin/partitioned-topics/prop/use/ns-abc/persistent/successTopic")); try { topic.delete().get(); fail(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 319d9e48453..044b36992a9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -66,7 +66,7 @@ import org.apache.commons.lang3.RandomUtils; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; +import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -1189,7 +1189,7 @@ public class ReplicatorTest extends ReplicatorTestBase { if (!isPartitionedTopic) { fail("Topic creation should not fail without any partitioned topic"); } - assertTrue(e.getCause() instanceof NotAllowedException); + assertTrue(e.getCause() instanceof NamingException); } // non-persistent topic test @@ -1202,7 +1202,7 @@ public class ReplicatorTest extends ReplicatorTestBase { if (!isPartitionedTopic) { fail("Topic creation should not fail without any partitioned topic"); } - assertTrue(e.getCause() instanceof NotAllowedException); + assertTrue(e.getCause() instanceof NamingException); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java index 468cd3e4109..e2aec70fb11 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java @@ -18,10 +18,6 @@ */ package org.apache.pulsar.broker.service.nonpersistent; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertThrows; -import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; import java.util.Optional; import java.util.UUID; @@ -48,6 +44,11 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.fail; + @Test(groups = "broker") public class NonPersistentTopicTest extends BrokerTestBase { @@ -113,16 +114,19 @@ public class NonPersistentTopicTest extends BrokerTestBase { } @Test - public void testCreateNonExistentPartitions() throws PulsarAdminException { + public void testCreateNonExistentPartitions() throws PulsarAdminException, PulsarClientException { final String topicName = "non-persistent://prop/ns-abc/testCreateNonExistentPartitions"; admin.topics().createPartitionedTopic(topicName, 4); TopicName partition = TopicName.get(topicName).getPartition(4); - assertThrows(PulsarClientException.NotAllowedException.class, () -> { + try { @Cleanup - Producer<byte[]> ignored = pulsarClient.newProducer() + Producer<byte[]> producer = pulsarClient.newProducer() .topic(partition.toString()) .create(); - }); + fail("unexpected behaviour"); + } catch (PulsarClientException.TopicDoesNotExistException ignored) { + + } assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 4); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 12fe82fc954..6bbd598b454 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.service.persistent; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -34,8 +36,6 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; import java.io.ByteArrayOutputStream; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; @@ -577,6 +577,37 @@ public class PersistentTopicTest extends BrokerTestBase { Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 4); } + @Test + public void testCompatibilityWithPartitionKeyword() throws PulsarAdminException, PulsarClientException { + final String topicName = "persistent://prop/ns-abc/testCompatibilityWithPartitionKeyword"; + TopicName topicNameEntity = TopicName.get(topicName); + String partition2 = topicNameEntity.getPartition(2).toString(); + // Create a non-partitioned topic with -partition- keyword + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(partition2) + .create(); + List<String> topics = admin.topics().getList("prop/ns-abc"); + // Close previous producer to simulate reconnect + producer.close(); + // Disable auto topic creation + conf.setAllowAutoTopicCreation(false); + // Check the topic exist in the list. + Assert.assertTrue(topics.contains(partition2)); + // Check this topic has no partition metadata. + Assert.assertThrows(PulsarAdminException.NotFoundException.class, + () -> admin.topics().getPartitionedTopicMetadata(topicName)); + // Reconnect to the broker and expect successful because the topic has existed in the broker. + producer = pulsarClient.newProducer() + .topic(partition2) + .create(); + producer.close(); + // Check the topic exist in the list again. + Assert.assertTrue(topics.contains(partition2)); + // Check this topic has no partition metadata again. + Assert.assertThrows(PulsarAdminException.NotFoundException.class, + () -> admin.topics().getPartitionedTopicMetadata(topicName)); + } + @Test public void testDeleteTopicFail() throws Exception { final String fullyTopicName = "persistent://prop/ns-abc/" + "tp_" diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java deleted file mode 100644 index a81dbe02b34..00000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.client.api; - -import static org.testng.Assert.assertThrows; -import lombok.Cleanup; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.PulsarClientException.NotAllowedException; -import org.apache.pulsar.common.naming.TopicDomain; -import org.apache.pulsar.common.naming.TopicName; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -@Test(groups = "broker-api") -public class ConsumerCreationTest extends ProducerConsumerBase { - - @BeforeMethod - @Override - protected void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @AfterMethod(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } - - @DataProvider(name = "topicDomainProvider") - public Object[][] topicDomainProvider() { - return new Object[][]{ - {TopicDomain.persistent}, - {TopicDomain.non_persistent} - }; - } - - @Test(dataProvider = "topicDomainProvider") - public void testCreateConsumerWhenTopicTypeMismatch(TopicDomain domain) - throws PulsarAdminException, PulsarClientException { - String nonPartitionedTopic = - TopicName.get(domain.value(), "public", "default", - "testCreateConsumerWhenTopicTypeMismatch-nonPartitionedTopic") - .toString(); - admin.topics().createNonPartitionedTopic(nonPartitionedTopic); - - // Topic type is non-partitioned, Trying to create consumer on partitioned topic. - assertThrows(NotAllowedException.class, () -> { - @Cleanup - Consumer<byte[]> ignored = - pulsarClient.newConsumer().topic(TopicName.get(nonPartitionedTopic).getPartition(2).toString()) - .subscriptionName("my-sub").subscribe(); - }); - - // Topic type is partitioned, Trying to create consumer on non-partitioned topic. - String partitionedTopic = TopicName.get(domain.value(), "public", "default", - "testCreateConsumerWhenTopicTypeMismatch-partitionedTopic") - .toString(); - admin.topics().createPartitionedTopic(partitionedTopic, 3); - - // Works fine because the lookup can help our to find the correct topic. - { - @Cleanup - Consumer<byte[]> ignored = - pulsarClient.newConsumer().topic(TopicName.get(partitionedTopic).getPartition(2).toString()) - .subscriptionName("my-sub").subscribe(); - } - - // Partition index is out of range. - assertThrows(NotAllowedException.class, () -> { - @Cleanup - Consumer<byte[]> ignored = - pulsarClient.newConsumer().topic(TopicName.get(partitionedTopic).getPartition(100).toString()) - .subscriptionName("my-sub").subscribe(); - }); - } - - @Test(dataProvider = "topicDomainProvider") - public void testCreateConsumerWhenSinglePartitionIsDeleted(TopicDomain domain) - throws PulsarAdminException, PulsarClientException { - testCreateConsumerWhenSinglePartitionIsDeleted(domain, false); - testCreateConsumerWhenSinglePartitionIsDeleted(domain, true); - } - - private void testCreateConsumerWhenSinglePartitionIsDeleted(TopicDomain domain, boolean allowAutoTopicCreation) - throws PulsarAdminException, PulsarClientException { - conf.setAllowAutoTopicCreation(allowAutoTopicCreation); - - String partitionedTopic = TopicName.get(domain.value(), "public", "default", - "testCreateConsumerWhenSinglePartitionIsDeleted-" + allowAutoTopicCreation) - .toString(); - admin.topics().createPartitionedTopic(partitionedTopic, 3); - admin.topics().delete(TopicName.get(partitionedTopic).getPartition(1).toString()); - - // Non-persistent topic only have the metadata, and no partition, so it works fine. - if (allowAutoTopicCreation || domain.equals(TopicDomain.non_persistent)) { - @Cleanup - Consumer<byte[]> ignored = - pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe(); - } else { - assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> { - @Cleanup - Consumer<byte[]> ignored = - pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe(); - }); - } - } -} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java index cd75383a487..d5734588288 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java @@ -18,11 +18,8 @@ */ package org.apache.pulsar.client.api; -import static org.testng.Assert.assertThrows; import static org.testng.Assert.fail; -import lombok.Cleanup; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.PulsarClientException.NotAllowedException; import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.common.naming.TopicDomain; @@ -194,74 +191,4 @@ public class ProducerCreationTest extends ProducerConsumerBase { Assert.assertFalse(admin.topics().getSubscriptions(topic.toString()).contains(initialSubscriptionName)); } - - @Test(dataProvider = "topicDomainProvider") - public void testCreateProducerWhenTopicTypeMismatch(TopicDomain domain) - throws PulsarAdminException, PulsarClientException { - String nonPartitionedTopic = - TopicName.get(domain.value(), "public", "default", - "testCreateProducerWhenTopicTypeMismatch-nonPartitionedTopic") - .toString(); - admin.topics().createNonPartitionedTopic(nonPartitionedTopic); - - // Topic type is non-partitioned, trying to create producer on the complete partitioned topic. - // Should throw NotAllowedException. - assertThrows(NotAllowedException.class, () -> { - @Cleanup - Producer<byte[]> ignored = - pulsarClient.newProducer().topic(TopicName.get(nonPartitionedTopic).getPartition(2).toString()) - .create(); - }); - - // Topic type is partitioned, trying to create producer on the base partitioned topic. - String partitionedTopic = TopicName.get(domain.value(), "public", "default", - "testCreateProducerWhenTopicTypeMismatch-partitionedTopic") - .toString(); - admin.topics().createPartitionedTopic(partitionedTopic, 3); - - // Works fine because the lookup can help our to find all the topics. - { - @Cleanup - Producer<byte[]> ignored = - pulsarClient.newProducer().topic(TopicName.get(partitionedTopic).getPartitionedTopicName()) - .create(); - } - - // Partition index is out of range. - assertThrows(NotAllowedException.class, () -> { - @Cleanup - Producer<byte[]> ignored = - pulsarClient.newProducer().topic(TopicName.get(partitionedTopic).getPartition(100).toString()) - .create(); - }); - } - - @Test(dataProvider = "topicDomainProvider") - public void testCreateProducerWhenSinglePartitionIsDeleted(TopicDomain domain) - throws PulsarAdminException, PulsarClientException { - testCreateProducerWhenSinglePartitionIsDeleted(domain, false); - testCreateProducerWhenSinglePartitionIsDeleted(domain, true); - } - - private void testCreateProducerWhenSinglePartitionIsDeleted(TopicDomain domain, boolean allowAutoTopicCreation) - throws PulsarAdminException, PulsarClientException { - conf.setAllowAutoTopicCreation(allowAutoTopicCreation); - - String partitionedTopic = TopicName.get(domain.value(), "public", "default", - "testCreateProducerWhenSinglePartitionIsDeleted-" + allowAutoTopicCreation) - .toString(); - admin.topics().createPartitionedTopic(partitionedTopic, 3); - admin.topics().delete(TopicName.get(partitionedTopic).getPartition(1).toString()); - - // Non-persistent topic only have the metadata, and no partition, so it works fine. - if (allowAutoTopicCreation || domain == TopicDomain.non_persistent) { - @Cleanup - Producer<byte[]> ignored = pulsarClient.newProducer().topic(partitionedTopic).create(); - } else { - assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> { - @Cleanup - Producer<byte[]> ignored = pulsarClient.newProducer().topic(partitionedTopic).create(); - }); - } - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java index c4ef53b292b..59cb7ae03d0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java @@ -19,19 +19,13 @@ package org.apache.pulsar.client.impl; import static org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; -import static org.assertj.core.api.Assertions.assertThat; -import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import java.util.Collection; -import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.testng.annotations.AfterClass; @@ -131,80 +125,4 @@ public class LookupServiceTest extends ProducerConsumerBase { admin.topics().delete(nonPartitionedTopic, false); } - @Test(dataProvider = "isUsingHttpLookup") - public void testGetPartitionedTopicMetadataByPulsarClient(boolean isUsingHttpLookup) throws PulsarAdminException { - LookupService lookupService = getLookupService(isUsingHttpLookup); - - // metadataAutoCreationEnabled is true. - assertThat(lookupService.getPartitionedTopicMetadata( - TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp")), true)) - .succeedsWithin(3, TimeUnit.SECONDS) - .matches(n -> n.partitions == 0); - - // metadataAutoCreationEnabled is true. - // Allow the get the metadata of single partition topic, because the auto-creation is enabled. - // But the producer/consumer is unavailable because the topic doesn't have the metadata. - assertThat(lookupService.getPartitionedTopicMetadata( - TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp") + "-partition-10"), - true)) - .succeedsWithin(3, TimeUnit.SECONDS) - .matches(n -> n.partitions == 0); - - Class<? extends Throwable> expectedExceptionClass = - isUsingHttpLookup ? PulsarClientException.NotFoundException.class : - PulsarClientException.TopicDoesNotExistException.class; - // metadataAutoCreationEnabled is false. - assertThat(lookupService.getPartitionedTopicMetadata( - TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp")), false)) - .failsWithin(3, TimeUnit.SECONDS) - .withThrowableThat() - .withCauseInstanceOf(expectedExceptionClass); - - // metadataAutoCreationEnabled is false. - assertThat(lookupService.getPartitionedTopicMetadata( - TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp") + "-partition-10"), - false)) - .failsWithin(3, TimeUnit.SECONDS) - .withThrowableThat() - .withCauseInstanceOf(expectedExceptionClass); - - // Verify the topic exists, and the metadataAutoCreationEnabled is false. - String nonPartitionedTopic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); - admin.topics().createNonPartitionedTopic(nonPartitionedTopic); - assertThat(lookupService.getPartitionedTopicMetadata(TopicName.get(nonPartitionedTopic), false)) - .succeedsWithin(3, TimeUnit.SECONDS) - .matches(n -> n.partitions == 0); - - String partitionedTopic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); - String partitionedTopicWithPartitionIndex = partitionedTopic + "-partition-10"; - admin.topics().createPartitionedTopic(partitionedTopic, 20); - assertThat(lookupService.getPartitionedTopicMetadata(TopicName.get(partitionedTopic), false)) - .succeedsWithin(3, TimeUnit.SECONDS) - .matches(n -> n.partitions == 20); - assertThat(lookupService.getPartitionedTopicMetadata(TopicName.get(partitionedTopicWithPartitionIndex), false)) - .succeedsWithin(3, TimeUnit.SECONDS) - .matches(n -> n.partitions == 0); - } - - @Test - public void testGetPartitionedTopicMedataByAdmin() throws PulsarAdminException { - String nonPartitionedTopic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); - String partitionedTopic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); - String partitionedTopicWithPartitionIndex = partitionedTopic + "-partition-10"; - // No topic, so throw the NotFound. - // BTW: The admin api doesn't allow to creat the metadata of topic default. - assertThrows(PulsarAdminException.NotFoundException.class, () -> admin.topics() - .getPartitionedTopicMetadata(nonPartitionedTopic)); - assertThrows(PulsarAdminException.NotFoundException.class, () -> admin.topics() - .getPartitionedTopicMetadata(partitionedTopic)); - assertThrows(PulsarAdminException.NotFoundException.class, - () -> admin.topics().getPartitionedTopicMetadata(partitionedTopicWithPartitionIndex)); - - admin.topics().createNonPartitionedTopic(nonPartitionedTopic); - assertEquals(admin.topics().getPartitionedTopicMetadata(nonPartitionedTopic).partitions, 0); - - admin.topics().createPartitionedTopic(partitionedTopic, 20); - assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopic).partitions, 20); - assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicWithPartitionIndex).partitions, 0); - } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index 023ac1263b7..5bb88bcbd0a 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -992,9 +992,7 @@ public class PulsarClientException extends IOException { public static Throwable wrap(Throwable t, String msg) { msg += "\n" + t.getMessage(); // wrap an exception with new message info - if (t instanceof TopicDoesNotExistException) { - return new TopicDoesNotExistException(msg); - } else if (t instanceof TimeoutException) { + if (t instanceof TimeoutException) { return new TimeoutException(msg); } else if (t instanceof InvalidConfigurationException) { return new InvalidConfigurationException(msg);