This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5699d04ea42 [fix][meta] PIP-454: fix stale migration status reported
after completion (#25993)
5699d04ea42 is described below
commit 5699d04ea4254d478a668ba1c5b70c214d2b1c3f
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Jun 10 11:41:16 2026 -0700
[fix][meta] PIP-454: fix stale migration status reported after completion
(#25993)
---
.../broker/admin/impl/MetadataMigrationBase.java | 10 +++++-
.../pulsar/common/migration/MigrationState.java | 10 ++++--
.../coordination/impl/MigrationCoordinator.java | 7 ++++
.../pulsar/metadata/MigrationCoordinatorTest.java | 40 ++++++++++++++++++++++
4 files changed, 64 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/MetadataMigrationBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/MetadataMigrationBase.java
index 12238200848..7b6ea44daa5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/MetadataMigrationBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/MetadataMigrationBase.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.migration.MigrationState;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.coordination.impl.MigrationCoordinator;
import org.apache.pulsar.metadata.impl.DualMetadataStore;
@@ -53,7 +54,14 @@ public class MetadataMigrationBase extends AdminResource {
validateSuperUserAccess();
try {
- var ogr =
pulsar().getLocalMetadataStore().get(MigrationState.MIGRATION_FLAG_PATH).get();
+ // The migration flag lives in the source store. Don't read it
through the
+ // DualMetadataStore: once the migration is completed, its reads
are routed to the
+ // target store, which doesn't hold the flag.
+ MetadataStore store = pulsar().getLocalMetadataStore();
+ if (store instanceof DualMetadataStore dualStore) {
+ store = dualStore.getSourceStore();
+ }
+ var ogr = store.get(MigrationState.MIGRATION_FLAG_PATH).get();
if (ogr.isPresent()) {
return
ObjectMapperFactory.getMapper().reader().readValue(ogr.get().getValue(),
MigrationState.class);
} else {
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/migration/MigrationState.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/migration/MigrationState.java
index e634c8c320c..d0319c741b5 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/migration/MigrationState.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/migration/MigrationState.java
@@ -41,6 +41,12 @@ public class MigrationState {
- public static final String MIGRATION_FLAG_PATH =
"/pulsar/migration-coordinator/migration";
- public static final String PARTICIPANTS_PATH =
"/pulsar/migration-coordinator/participants";
+ /**
+ * Root of the migration coordination state. This subtree is only
meaningful in the source
+ * store and is never copied into the target store.
+ */
+ public static final String COORDINATOR_PATH =
"/pulsar/migration-coordinator";
+
+ public static final String MIGRATION_FLAG_PATH = COORDINATOR_PATH +
"/migration";
+ public static final String PARTICIPANTS_PATH = COORDINATOR_PATH +
"/participants";
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/MigrationCoordinator.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/MigrationCoordinator.java
index 93dd95a9d87..d9d3bdf75a5 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/MigrationCoordinator.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/MigrationCoordinator.java
@@ -200,6 +200,13 @@ public class MigrationCoordinator {
}
}
+ if (path.equals(MigrationState.COORDINATOR_PATH)
+ || path.startsWith(MigrationState.COORDINATOR_PATH + "/"))
{
+ // The migration coordination state is only meaningful in the
source store. Copying it
+ // would leave a permanently stale migration flag in the
target store.
+ continue;
+ }
+
semaphore.acquire();
copy(path).whenComplete((res, e) -> {
semaphore.release();
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MigrationCoordinatorTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MigrationCoordinatorTest.java
index 2a6b99bdcca..4ff03179275 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MigrationCoordinatorTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MigrationCoordinatorTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.metadata;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
@@ -40,6 +41,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.coordination.impl.MigrationCoordinator;
+import org.apache.pulsar.metadata.impl.DualMetadataStore;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -275,6 +277,44 @@ public class MigrationCoordinatorTest extends
BaseMetadataStoreTest {
assertEquals(new String(target3.get().getValue(),
StandardCharsets.UTF_8), "value3");
}
+ @Test
+ public void testStatusReportsCompletedAfterMigration() throws Exception {
+ String prefix = newKey();
+
+ @Cleanup
+ DualMetadataStore sourceStore = (DualMetadataStore)
MetadataStoreFactory.create(
+ "zk:" + zks.getConnectionString(),
MetadataStoreConfig.builder().build());
+
+ String targetUrl = getOxiaServerConnectString();
+
+ @Cleanup
+ MetadataStore targetStore = MetadataStoreFactory.create(targetUrl,
MetadataStoreConfig.builder().build());
+
+ String key = prefix + "/key1";
+ sourceStore.put(key, "value1".getBytes(StandardCharsets.UTF_8),
Optional.empty()).join();
+
+ // Run the migration through the DualMetadataStore, like the broker
admin endpoint does
+ MigrationCoordinator coordinator = new
MigrationCoordinator(sourceStore, targetUrl);
+ coordinator.startMigration();
+
+ // The source store holds the authoritative migration state. This is
what the status
+ // endpoint reports, since reads through the DualMetadataStore are
routed to the target
+ // store once the migration is completed.
+ Optional<GetResult> result =
sourceStore.getSourceStore().get(MigrationState.MIGRATION_FLAG_PATH).join();
+ assertTrue(result.isPresent());
+ MigrationState state = ObjectMapperFactory.getMapper().reader()
+ .readValue(result.get().getValue(), MigrationState.class);
+ assertEquals(state.getPhase(), MigrationPhase.COMPLETED);
+
+ // Regular data was copied
+ assertTrue(targetStore.get(key).join().isPresent());
+
+ // The migration coordination state was not copied: a copied flag
would permanently
+ // report the stale phase (COPYING) it had at copy time
+
assertFalse(targetStore.get(MigrationState.MIGRATION_FLAG_PATH).join().isPresent());
+
assertFalse(targetStore.exists(MigrationState.COORDINATOR_PATH).join());
+ }
+
@Test
public void testMigrationStateStructure() throws Exception {
@Cleanup