This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5699d04ea42 [fix][meta] PIP-454: fix stale migration status reported 
after completion (#25993)
5699d04ea42 is described below

commit 5699d04ea4254d478a668ba1c5b70c214d2b1c3f
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Jun 10 11:41:16 2026 -0700

    [fix][meta] PIP-454: fix stale migration status reported after completion 
(#25993)
---
 .../broker/admin/impl/MetadataMigrationBase.java   | 10 +++++-
 .../pulsar/common/migration/MigrationState.java    | 10 ++++--
 .../coordination/impl/MigrationCoordinator.java    |  7 ++++
 .../pulsar/metadata/MigrationCoordinatorTest.java  | 40 ++++++++++++++++++++++
 4 files changed, 64 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/MetadataMigrationBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/MetadataMigrationBase.java
index 12238200848..7b6ea44daa5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/MetadataMigrationBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/MetadataMigrationBase.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.migration.MigrationState;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.coordination.impl.MigrationCoordinator;
 import org.apache.pulsar.metadata.impl.DualMetadataStore;
 
@@ -53,7 +54,14 @@ public class MetadataMigrationBase extends AdminResource {
         validateSuperUserAccess();
 
         try {
-            var ogr = 
pulsar().getLocalMetadataStore().get(MigrationState.MIGRATION_FLAG_PATH).get();
+            // The migration flag lives in the source store. Don't read it 
through the
+            // DualMetadataStore: once the migration is completed, its reads 
are routed to the
+            // target store, which doesn't hold the flag.
+            MetadataStore store = pulsar().getLocalMetadataStore();
+            if (store instanceof DualMetadataStore dualStore) {
+                store = dualStore.getSourceStore();
+            }
+            var ogr = store.get(MigrationState.MIGRATION_FLAG_PATH).get();
             if (ogr.isPresent()) {
                 return 
ObjectMapperFactory.getMapper().reader().readValue(ogr.get().getValue(), 
MigrationState.class);
             } else {
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/migration/MigrationState.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/migration/MigrationState.java
index e634c8c320c..d0319c741b5 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/migration/MigrationState.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/migration/MigrationState.java
@@ -41,6 +41,12 @@ public class MigrationState {
 
 
 
-    public static final String MIGRATION_FLAG_PATH = 
"/pulsar/migration-coordinator/migration";
-    public static final String PARTICIPANTS_PATH = 
"/pulsar/migration-coordinator/participants";
+    /**
+     * Root of the migration coordination state. This subtree is only 
meaningful in the source
+     * store and is never copied into the target store.
+     */
+    public static final String COORDINATOR_PATH = 
"/pulsar/migration-coordinator";
+
+    public static final String MIGRATION_FLAG_PATH = COORDINATOR_PATH + 
"/migration";
+    public static final String PARTICIPANTS_PATH = COORDINATOR_PATH + 
"/participants";
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/MigrationCoordinator.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/MigrationCoordinator.java
index 93dd95a9d87..d9d3bdf75a5 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/MigrationCoordinator.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/MigrationCoordinator.java
@@ -200,6 +200,13 @@ public class MigrationCoordinator {
                 }
             }
 
+            if (path.equals(MigrationState.COORDINATOR_PATH)
+                    || path.startsWith(MigrationState.COORDINATOR_PATH + "/")) 
{
+                // The migration coordination state is only meaningful in the 
source store. Copying it
+                // would leave a permanently stale migration flag in the 
target store.
+                continue;
+            }
+
             semaphore.acquire();
             copy(path).whenComplete((res, e) -> {
                 semaphore.release();
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MigrationCoordinatorTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MigrationCoordinatorTest.java
index 2a6b99bdcca..4ff03179275 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MigrationCoordinatorTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MigrationCoordinatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.metadata;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.expectThrows;
@@ -40,6 +41,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreFactory;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.coordination.impl.MigrationCoordinator;
+import org.apache.pulsar.metadata.impl.DualMetadataStore;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -275,6 +277,44 @@ public class MigrationCoordinatorTest extends 
BaseMetadataStoreTest {
         assertEquals(new String(target3.get().getValue(), 
StandardCharsets.UTF_8), "value3");
     }
 
+    @Test
+    public void testStatusReportsCompletedAfterMigration() throws Exception {
+        String prefix = newKey();
+
+        @Cleanup
+        DualMetadataStore sourceStore = (DualMetadataStore) 
MetadataStoreFactory.create(
+                "zk:" + zks.getConnectionString(), 
MetadataStoreConfig.builder().build());
+
+        String targetUrl = getOxiaServerConnectString();
+
+        @Cleanup
+        MetadataStore targetStore = MetadataStoreFactory.create(targetUrl, 
MetadataStoreConfig.builder().build());
+
+        String key = prefix + "/key1";
+        sourceStore.put(key, "value1".getBytes(StandardCharsets.UTF_8), 
Optional.empty()).join();
+
+        // Run the migration through the DualMetadataStore, like the broker 
admin endpoint does
+        MigrationCoordinator coordinator = new 
MigrationCoordinator(sourceStore, targetUrl);
+        coordinator.startMigration();
+
+        // The source store holds the authoritative migration state. This is 
what the status
+        // endpoint reports, since reads through the DualMetadataStore are 
routed to the target
+        // store once the migration is completed.
+        Optional<GetResult> result = 
sourceStore.getSourceStore().get(MigrationState.MIGRATION_FLAG_PATH).join();
+        assertTrue(result.isPresent());
+        MigrationState state = ObjectMapperFactory.getMapper().reader()
+                .readValue(result.get().getValue(), MigrationState.class);
+        assertEquals(state.getPhase(), MigrationPhase.COMPLETED);
+
+        // Regular data was copied
+        assertTrue(targetStore.get(key).join().isPresent());
+
+        // The migration coordination state was not copied: a copied flag 
would permanently
+        // report the stale phase (COPYING) it had at copy time
+        
assertFalse(targetStore.get(MigrationState.MIGRATION_FLAG_PATH).join().isPresent());
+        
assertFalse(targetStore.exists(MigrationState.COORDINATOR_PATH).join());
+    }
+
     @Test
     public void testMigrationStateStructure() throws Exception {
         @Cleanup

Reply via email to