lhotari commented on code in PR #25946:
URL: https://github.com/apache/pulsar/pull/25946#discussion_r3368189852


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java:
##########
@@ -187,62 +224,132 @@ private void syncTailItems() throws 
InterruptedException, IOException, TimeoutEx
     private void syncExistingItemsToMetadataStore(ServiceUnitStateTableView 
src)
             throws JsonProcessingException, ExecutionException, 
InterruptedException, TimeoutException {
         // Directly use store to sync existing items to 
metadataStoreTableView(otherwise, they are conflicted out)
-        var store = pulsar.getLocalMetadataStore();
-        var writer = ObjectMapperFactory.getMapper().writer();
-        var opTimeout = 
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
         var srcIter = src.entrySet().iterator();
         while (srcIter.hasNext()) {
             var e = srcIter.next();
-            
futures.add(store.put(ServiceUnitStateMetadataStoreTableViewImpl.PATH_PREFIX + 
"/" + e.getKey(),
-                    writer.writeValueAsBytes(e.getValue()), 
Optional.empty()).thenApply(__ -> null));
-            if (futures.size() == MAX_CONCURRENT_SYNC_COUNT || 
!srcIter.hasNext()) {
-                FutureUtil.waitForAll(futures).get(opTimeout, 
TimeUnit.SECONDS);
-            }
+            futures.add(writeToMetadataStore(e.getKey(), e.getValue()));
+            maybeWaitCompletion(futures, !srcIter.hasNext());
+        }
+    }
+
+    private void maybeWaitCompletion(List<CompletableFuture<Void>> futures, 
boolean forceWait)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        if (!futures.isEmpty() && (futures.size() == MAX_CONCURRENT_SYNC_COUNT 
|| forceWait)) {
+            FutureUtil.waitForAll(futures)
+                    
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
+            futures.clear();
         }
     }
 
+    private @NonNull CompletableFuture<Void> writeToMetadataStore(String key, 
ServiceUnitStateData value)
+            throws JsonProcessingException {
+        return 
pulsar.getLocalMetadataStore().put(ServiceUnitStateMetadataStoreTableViewImpl.PATH_PREFIX
 + "/" + key,
+                jsonWriter.writeValueAsBytes(value), 
Optional.empty()).thenApply(__ -> null);
+    }
+
     private void syncExistingItemsToSystemTopic(ServiceUnitStateTableView src,
                                                 ServiceUnitStateTableView dst)
             throws ExecutionException, InterruptedException, TimeoutException {
-        var opTimeout = 
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
         var srcIter = src.entrySet().iterator();
         while (srcIter.hasNext()) {
             var e = srcIter.next();
             futures.add(dst.put(e.getKey(), e.getValue()));
-            if (futures.size() == MAX_CONCURRENT_SYNC_COUNT || 
!srcIter.hasNext()) {
-                FutureUtil.waitForAll(futures).get(opTimeout, 
TimeUnit.SECONDS);
-            }
+            maybeWaitCompletion(futures, !srcIter.hasNext());
         }
     }
 
     private void clean(ServiceUnitStateTableView dst)
             throws ExecutionException, InterruptedException, TimeoutException {
-        var opTimeout = 
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
         var dstIter = dst.entrySet().iterator();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
         while (dstIter.hasNext()) {
             var e = dstIter.next();
             futures.add(dst.delete(e.getKey()));
-            if (futures.size() == MAX_CONCURRENT_SYNC_COUNT || 
!dstIter.hasNext()) {
-                FutureUtil.waitForAll(futures).get(opTimeout, 
TimeUnit.SECONDS);
-            }
+            maybeWaitCompletion(futures, !dstIter.hasNext());
         }
     }
 
-    private boolean waitUntilSynced(ServiceUnitStateTableView srt, 
ServiceUnitStateTableView dst, long started)
+    private boolean waitUntilSynced(ServiceUnitStateTableView src, 
ServiceUnitStateTableView dst, long started)
             throws InterruptedException {
-        while (srt.entrySet().size() != dst.entrySet().size()) {
-            if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
started)
-                    > SYNC_WAIT_TIME_IN_SECS) {
+        long lastReconciled = started;
+        while (src.entrySet().size() != dst.entrySet().size()) {
+            long now = System.currentTimeMillis();
+            if (TimeUnit.MILLISECONDS.toSeconds(now - started) > 
syncWaitTimeInSecs) {
                 return false;
             }
+            // Give in-flight syncs a grace period to settle on their own, 
then reconcile
+            // periodically: updates that raced with the table views' 
(re)start were replayed
+            // to the fresh views as existing items — which are deliberately 
not wired to
+            // sync — so without reconciliation the views would never converge.
+            if (now - lastReconciled >= RECONCILE_INTERVAL_IN_MILLIS) {
+                log.debug().attr("srcSize", 
src.entrySet().size()).attr("dstSize", dst.entrySet().size())
+                        .attr("elapsedSecs", 
TimeUnit.MILLISECONDS.toSeconds(now - started))
+                        .log("Tableviews not synced yet; reconciling");
+                reconcile(src, dst, started);
+                lastReconciled = now;
+            }
             Thread.sleep(100);
         }
         return true;
     }
 
+    /**
+     * Copies items the destination table view is missing and removes stale 
items that no longer
+     * exist in the source. Channel updates that land between the 
existing-items copy and the
+     * registration of the tail listeners are only visible as existing items 
of the freshly
+     * started views, so the tail listeners never see them. Writes flow to the 
migration source
+     * while the syncer starts, making the source view authoritative; 
destination-only items are
+     * removed only when they predate this sync phase and are still absent 
from the source, so a
+     * concurrent fresh write to the destination is never discarded. Failures 
are logged and left
+     * for the next reconcile pass. Runs on the caller's (load manager) thread 
with each batch
+     * bounded by the metadata store operation timeout.
+     */
+    private void reconcile(ServiceUnitStateTableView src, 
ServiceUnitStateTableView dst, long started)
+            throws InterruptedException {
+        // Snapshot the destination entries before iterating the source so 
that a key arriving
+        // in the destination through a concurrent tail sync cannot be 
misclassified as stale.
+        var staleDstItems = new HashMap<String, ServiceUnitStateData>();
+        for (var e : dst.entrySet()) {
+            staleDstItems.put(e.getKey(), e.getValue());
+        }
+        try {
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (var e : src.entrySet()) {
+                if (staleDstItems.remove(e.getKey()) == null) {
+                    log.info().attr("serviceUnit", e.getKey())
+                            .log("Reconciling item missing from the 
destination tableview");
+                    if (dst.isMetadataStoreBased()) {
+                        // Write directly to the store like 
syncExistingItemsToMetadataStore
+                        // does; the view's put() would conflict the item out.
+                        futures.add(writeToMetadataStore(e.getKey(), 
e.getValue()));
+                    } else {
+                        futures.add(dst.put(e.getKey(), e.getValue()));
+                    }
+                    maybeWaitCompletion(futures, false);
+                }
+            }
+            for (var e : staleDstItems.entrySet()) {
+                // Only remove items written before this sync phase began and 
re-confirmed absent
+                // from the source: a fresh destination write (e.g. from a 
broker already switched
+                // to the destination implementation) is propagated to the 
source by the tail
+                // listener instead of being deleted here.
+                if (e.getValue().timestamp() < started && src.get(e.getKey()) 
== null) {

Review Comment:
   quick comment: the migration is one way, src -> dst. there isn't an 
intention to copy items back. I agree that timestamps are brittle here.
   
   It seems that making this migration tool perfect would require some 
redesign. It looks like the initial design is best effort type.
   the use case of this migration solution is rare. it's meant for online 
migration from system topic based ELB to metadata store based ELB.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to