This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 951a426dd2a [test][broker] PIP-475: end-to-end migration tests + 
transition fixes (#25878)
951a426dd2a is described below

commit 951a426dd2a1ecc645c069afb666b82176dd88ed
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 27 13:05:56 2026 -0700

    [test][broker] PIP-475: end-to-end migration tests + transition fixes 
(#25878)
---
 .../pulsar/broker/admin/v2/ScalableTopics.java     |  22 ++--
 .../broker/service/scalable/DagWatchSession.java   |  16 ++-
 .../client/api/v5/V5MigrationEndToEndTest.java     | 117 +++++++++++++++++++++
 .../client/impl/v5/ScalableTopicProducer.java      |  86 +++++++++++----
 .../topics/TestScalableTopicMigration.java         | 105 ++++++++++++++++++
 .../src/test/resources/pulsar-messaging.xml        |   1 +
 6 files changed, 314 insertions(+), 33 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
index f76d9a11ff2..8c310c803ef 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
@@ -339,12 +339,22 @@ public class ScalableTopics extends AdminResource {
         } catch (Exception e) {
             return CompletableFuture.failedFuture(e);
         }
-        CompletableFuture<? extends TopicStats> statsFuture =
-                partitions > 0
-                        ? 
admin.topics().getPartitionedStatsAsync(persistentBase.toString(), false)
-                        : 
admin.topics().getStatsAsync(persistentBase.toString());
-        return statsFuture.thenAccept(stats -> {
-            long legacy = countLegacyConnections(stats);
+        // For a partitioned topic, inspect per-partition stats rather than 
the aggregate:
+        // aggregation merges publishers by producer name into fresh stat 
objects that drop
+        // per-connection metadata, which would hide the V5-managed marker and 
make every
+        // V5 connection look like a legacy v4 one.
+        final CompletableFuture<Long> legacyCount = partitions > 0
+                ? 
admin.topics().getPartitionedStatsAsync(persistentBase.toString(), true)
+                        .thenApply(stats -> {
+                            long count = 0;
+                            for (TopicStats partitionStats : 
stats.getPartitions().values()) {
+                                count += 
countLegacyConnections(partitionStats);
+                            }
+                            return count;
+                        })
+                : admin.topics().getStatsAsync(persistentBase.toString())
+                        .thenApply(ScalableTopics::countLegacyConnections);
+        return legacyCount.thenAccept(legacy -> {
             if (legacy > 0) {
                 throw new RestException(Response.Status.CONFLICT,
                         legacy + " legacy v4 client connection(s) still 
attached to " + persistentBase
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
index 350125438e7..fe033b23336 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
@@ -63,8 +63,11 @@ public class DagWatchSession implements 
ScalableTopicResources.MetadataPathListe
     private final BrokerService brokerService;
 
     private final String metadataPath;
-    /** Canonical {@code topic://...} identity returned to the client 
regardless of the
-     *  input form ({@code topic://}, {@code persistent://}, or short-form). */
+    /** Canonical {@code topic://...} identity, regardless of the input form 
({@code topic://},
+     *  {@code persistent://}, or short-form). Used both as the {@code 
resolved_topic_name}
+     *  reported to the client and as the parent when computing {@code 
segment://} URIs for a
+     *  real DAG (those require the {@code topic://} domain). */
+    private final TopicName scalableTopicName;
     private final String resolvedTopicName;
     private volatile boolean closed = false;
 
@@ -79,7 +82,8 @@ public class DagWatchSession implements 
ScalableTopicResources.MetadataPathListe
         this.resources = resources;
         this.brokerService = brokerService;
         this.metadataPath = resources.topicPath(topicName);
-        this.resolvedTopicName = topicName.toScalableTopic().toString();
+        this.scalableTopicName = topicName.toScalableTopic();
+        this.resolvedTopicName = scalableTopicName.toString();
         this.log = LOG.with().attr("topic", topicName).attr("sessionId", 
sessionId).build();
     }
 
@@ -341,9 +345,11 @@ public class DagWatchSession implements 
ScalableTopicResources.MetadataPathListe
         Map<Long, String> result = new LinkedHashMap<>();
         CompletableFuture<?>[] futures = 
layout.getActiveSegments().values().stream()
                 .map(segment -> {
-                    // Resolve which broker owns this segment's underlying 
segment:// topic
+                    // Resolve which broker owns this segment's underlying 
segment:// topic.
+                    // SegmentTopicName.fromParent requires the topic:// 
domain, so use the
+                    // canonical scalable name (the session's input may be 
persistent://).
                     TopicName segTn = 
org.apache.pulsar.common.scalable.SegmentTopicName.fromParent(
-                            topicName, segment.hashRange(), 
segment.segmentId());
+                            scalableTopicName, segment.hashRange(), 
segment.segmentId());
                     var lookupOptions = 
org.apache.pulsar.broker.namespace.LookupOptions.builder()
                             .readOnly(false).authoritative(false).build();
                     return brokerService.getPulsar().getNamespaceService()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MigrationEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MigrationEndToEndTest.java
new file mode 100644
index 00000000000..6eaa2cfb406
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MigrationEndToEndTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.expectThrows;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end tests for the PIP-475 regular-to-scalable migration, exercising 
the full
+ * operator timeline against a live broker: V5 clients operate on a regular 
topic via the
+ * synthetic layout, the operator migrates, and the same clients transparently 
transition
+ * to the real DAG with no data loss across the migration boundary.
+ */
+public class V5MigrationEndToEndTest extends V5ClientBaseTest {
+
+    private String baseName(String suffix) {
+        return getNamespace() + "/" + suffix + "-" + 
UUID.randomUUID().toString().substring(0, 8);
+    }
+
+    @Test
+    public void testV5ProducerSurvivesMigrationAndAllDataIsConsumable() throws 
Exception {
+        // Timeline:
+        //  1. A 2-partition regular topic exists.
+        //  2. A V5 producer publishes batch #1 through the synthetic layout 
(mod-N routing
+        //     to the legacy segments == the partitions).
+        //  3. The operator migrates the topic (only the V5 producer is 
attached, and it
+        //     carries the V5-managed marker, so the precheck passes without 
--force).
+        //  4. The same V5 producer publishes batch #2 — it has transparently 
transitioned to
+        //     the real DAG and now range-routes to the new active child 
segments.
+        //  5. A V5 queue consumer reading EARLIEST drains everything: batch 
#1 from the sealed
+        //     legacy parents, batch #2 from the active children.
+        String topic = baseName("e2e");
+        admin.topics().createPartitionedTopic(topic, 2);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic("persistent://" + topic)
+                .create();
+
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < 20; i++) {
+            String v = "pre-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Migrate. The attached V5 producer is marked, so no legacy 
connections are seen.
+        admin.scalableTopics().migrateToScalable(topic, false);
+
+        // The producer transparently follows the layout change to the real 
DAG.
+        for (int i = 0; i < 20; i++) {
+            String v = "post-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic("persistent://" + topic)
+                .subscriptionName("e2e-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        Set<String> received = new HashSet<>();
+        for (int i = 0; i < 40; i++) {
+            org.apache.pulsar.client.api.v5.Message<String> m = 
consumer.receive(Duration.ofSeconds(10));
+            assertNotNull(m, "expected 40 messages, missing after " + 
received.size());
+            received.add(m.value());
+            consumer.acknowledge(m.id());
+        }
+        assertEquals(received, sent, "every pre- and post-migration message 
must be consumable");
+    }
+
+    @Test
+    public void testV4ProducerLockedOutAfterMigration() throws Exception {
+        // After migration the old topic is terminated, so a legacy v4 
producer can no longer
+        // write to it — the produce fails with a terminated-topic error.
+        String topic = baseName("lockout");
+        admin.topics().createNonPartitionedTopic("persistent://" + topic);
+
+        admin.scalableTopics().migrateToScalable(topic, false);
+
+        // A v4 producer either fails to create on, or fails to send to, the 
now-terminated
+        // (and scalable-shadowed) persistent:// topic.
+        expectThrows(org.apache.pulsar.client.api.PulsarClientException.class, 
() -> {
+            org.apache.pulsar.client.api.Producer<byte[]> v4Producer = 
pulsarClient.newProducer()
+                    .topic("persistent://" + topic)
+                    .create();
+            v4Producer.send("blocked".getBytes());
+        });
+    }
+}
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
index dd1540feee7..5c7a2633e47 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
@@ -47,6 +47,13 @@ import 
org.apache.pulsar.common.scalable.ScalableTopicConstants;
 final class ScalableTopicProducer<T> implements Producer<T>, 
DagWatchClient.LayoutChangeListener {
 
     private static final Logger LOG = Logger.get(ScalableTopicProducer.class);
+
+    /** Max attempts for a send when the target segment is gone (split/merge 
seal or migration
+     *  termination), giving the DAG watch time to deliver the new layout 
before giving up. */
+    private static final int SEND_RETRY_MAX_ATTEMPTS = 10;
+    /** Cap on the per-attempt backoff while waiting for the new layout. */
+    private static final long SEND_RETRY_MAX_BACKOFF_MS = 500L;
+
     private final Logger log;
 
     private final PulsarClientV5 client;
@@ -181,38 +188,73 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
             java.util.List<String> replicationClusters,
             org.apache.pulsar.client.api.v5.Transaction txn) throws 
PulsarClientException {
 
-        for (int attempt = 0; attempt < 3; attempt++) {
+        PulsarClientException lastError = null;
+        for (int attempt = 0; attempt < SEND_RETRY_MAX_ATTEMPTS; attempt++) {
             long segmentId = routeMessage(key);
-            var producer = getOrCreateSegmentProducer(segmentId);
-
             try {
+                var producer = getOrCreateSegmentProducer(segmentId);
                 var v4MsgId = buildV4Message(producer, key, value, properties,
                         eventTime, sequenceId, deliverAfter, deliverAt, 
replicationClusters, txn)
                         .send();
                 return new MessageIdV5(v4MsgId, segmentId);
-            } catch 
(org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException
-                     | 
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException e) {
-                // The segment was sealed (split/merge). We may observe this 
either as
-                // TopicTerminated (broker reply to a still-open producer) or 
AlreadyClosed
-                // (the v4 producer noticed first and shut itself down). 
Either way, drop
-                // the stale per-segment producer and retry — the DAG watch 
will deliver
-                // the new layout shortly, and routeMessage on the next 
attempt will land
-                // on an active child.
-                log.info().attr("segmentId", segmentId)
-                        .attr("attempt", attempt + 1)
-                        .log("Segment sealed, waiting for layout update");
-                segmentProducers.remove(segmentId);
-                try {
-                    Thread.sleep(100L * (attempt + 1));
-                } catch (InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                    throw new PulsarClientException("Interrupted while waiting 
for layout update", ie);
+            } catch (PulsarClientException e) {
+                // Thrown while (re)creating the per-segment producer — 
already a V5 exception
+                // (it may wrap a v4 TopicTerminated/AlreadyClosed cause).
+                if (!isSegmentGoneError(e)) {
+                    throw e;
                 }
+                lastError = e;
             } catch (org.apache.pulsar.client.api.PulsarClientException e) {
-                throw new PulsarClientException(e.getMessage(), e);
+                // Thrown by the v4 producer's send().
+                if (!isSegmentGoneError(e)) {
+                    throw new PulsarClientException(e.getMessage(), e);
+                }
+                lastError = new PulsarClientException(e.getMessage(), e);
+            }
+            // The target segment is gone: sealed by a split/merge, or 
terminated by a
+            // regular-to-scalable migration. Drop the stale per-segment 
producer and wait
+            // for the DAG watch to deliver the new layout; routeMessage on 
the next attempt
+            // lands on an active child.
+            log.info().attr("segmentId", segmentId).attr("attempt", attempt + 
1)
+                    .log("Target segment gone, waiting for layout update");
+            segmentProducers.remove(segmentId);
+            try {
+                Thread.sleep(Math.min(100L * (attempt + 1), 
SEND_RETRY_MAX_BACKOFF_MS));
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                throw new PulsarClientException("Interrupted while waiting for 
layout update", ie);
+            }
+        }
+        throw lastError != null ? lastError
+                : new PulsarClientException("Failed to send after segment 
termination retries");
+    }
+
+    /**
+     * True if {@code t} (or one of its causes) signals that the target 
segment is gone —
+     * sealed by a split/merge or terminated by a regular-to-scalable 
migration — so the send
+     * should be retried once the new layout arrives. Handles both the v4 
exceptions thrown by
+     * {@code send()} and the V5-wrapped exceptions thrown while (re)creating 
the per-segment
+     * producer on a now-terminated topic.
+     */
+    private static boolean isSegmentGoneError(Throwable t) {
+        for (Throwable cause = t; cause != null; cause = cause.getCause()) {
+            if (cause instanceof 
org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException) {
+                return true;
+            }
+            if (cause instanceof 
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException) {
+                return true;
+            }
+            // The per-segment producer-creation path can surface the broker's 
terminated /
+            // already-closed error as a plain (untyped) PulsarClientException 
whose message
+            // carries the server-side class name; match on that too.
+            String msg = cause.getMessage();
+            if (msg != null
+                    && (msg.contains("TopicTerminated") || 
msg.contains("already terminated")
+                        || msg.contains("AlreadyClosed"))) {
+                return true;
             }
         }
-        throw new PulsarClientException("Failed to send after segment 
termination retries");
+        return false;
     }
 
     /**
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestScalableTopicMigration.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestScalableTopicMigration.java
new file mode 100644
index 00000000000..b3e7d5d5249
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestScalableTopicMigration.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.topics;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.util.function.Supplier;
+import lombok.CustomLog;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.testng.annotations.Test;
+
+/**
+ * Integration test for the PIP-475 regular-to-scalable topic migration, 
exercising the
+ * {@code pulsar-admin scalable-topics migrate} command against a real 
multi-broker cluster
+ * (real metadata store, BookKeeper, and cross-broker bundle ownership).
+ *
+ * <p>The V5-client transparent transition across the migration boundary is 
covered by the
+ * in-process broker test {@code V5MigrationEndToEndTest}; this test focuses 
on what the
+ * dockerized cluster adds: that the migration command, its CLI wiring, the 
resulting scalable
+ * metadata, and the post-migration termination of the old topic all behave in 
a real
+ * deployment.
+ */
+@CustomLog
+public class TestScalableTopicMigration extends PulsarTestSuite {
+
+    private final int numBrokers = 2;
+
+    public void setupCluster() throws Exception {
+        this.setupCluster("");
+    }
+
+    protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
+            String clusterName,
+            PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+        specBuilder.numBrokers(numBrokers);
+        specBuilder.enableContainerLog(true);
+        return specBuilder;
+    }
+
+    @Test(dataProvider = "ServiceUrls", timeOut = 300_000)
+    public void testMigrateRegularTopicToScalable(Supplier<String> serviceUrl) 
throws Exception {
+        final String nsName = "mig-" + randomName(6);
+        final String namespace = "public/" + nsName;
+        final String shortTopic = namespace + "/regular";
+        final String topic = "persistent://" + shortTopic;
+        final int numPartitions = numBrokers * 2;
+
+        pulsarCluster.createNamespace(nsName);
+        pulsarCluster.createPartitionedTopic(topic, numPartitions);
+
+        try (PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl.get()).build()) {
+            // Seed data on the regular partitioned topic (lands across the 
partitions).
+            try (Producer<byte[]> producer = 
client.newProducer().topic(topic).create()) {
+                for (int i = 0; i < 50; i++) {
+                    producer.newMessage().key("k-" + i).value(("v-" + 
i).getBytes(UTF_8)).send();
+                }
+            }
+
+            // Migrate via the admin CLI.
+            ContainerExecResult migrate = 
pulsarCluster.runAdminCommandOnAnyBroker(
+                    "scalable-topics", "migrate", topic);
+            assertEquals(migrate.getExitCode(), 0L, "migrate failed: " + 
migrate.getStderr());
+
+            // The topic is now scalable: get-metadata returns its segment DAG.
+            ContainerExecResult metadata = 
pulsarCluster.runAdminCommandOnAnyBroker(
+                    "scalable-topics", "get-metadata", shortTopic);
+            assertEquals(metadata.getExitCode(), 0L, "get-metadata failed: " + 
metadata.getStderr());
+            assertTrue(metadata.getStdout().contains("segmentId"),
+                    "scalable metadata should list segments, got: " + 
metadata.getStdout());
+
+            // v4 lockout: the old partitions are terminated, so a legacy v4 
producer can no
+            // longer write to the topic.
+            try (Producer<byte[]> blocked = 
client.newProducer().topic(topic).create()) {
+                blocked.send("blocked".getBytes(UTF_8));
+                fail("v4 produce to a migrated (terminated) topic must fail");
+            } catch (PulsarClientException expected) {
+                log.info().exceptionMessage(expected)
+                        .log("v4 producer correctly locked out after 
migration");
+            }
+        }
+    }
+}
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml 
b/tests/integration/src/test/resources/pulsar-messaging.xml
index a34670267dc..e3fd40e9978 100644
--- a/tests/integration/src/test/resources/pulsar-messaging.xml
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -30,6 +30,7 @@
             <class 
name="org.apache.pulsar.tests.integration.messaging.NonDurableConsumerMessagingTest"
 />
             <class 
name="org.apache.pulsar.tests.integration.messaging.MessagingSmokeTest" />
             <class name="org.apache.pulsar.tests.integration.admin.AdminTest" 
/>
+            <class 
name="org.apache.pulsar.tests.integration.topics.TestScalableTopicMigration" />
 
             <class 
name="org.apache.pulsar.tests.integration.oxia.OxiaSmokeTest" />
         </classes>

Reply via email to