void-ptr974 commented on code in PR #25946:
URL: https://github.com/apache/pulsar/pull/25946#discussion_r3368641089


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java:
##########
@@ -187,62 +224,132 @@ private void syncTailItems() throws 
InterruptedException, IOException, TimeoutEx
     private void syncExistingItemsToMetadataStore(ServiceUnitStateTableView 
src)
             throws JsonProcessingException, ExecutionException, 
InterruptedException, TimeoutException {
         // Directly use store to sync existing items to 
metadataStoreTableView(otherwise, they are conflicted out)
-        var store = pulsar.getLocalMetadataStore();
-        var writer = ObjectMapperFactory.getMapper().writer();
-        var opTimeout = 
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
         var srcIter = src.entrySet().iterator();
         while (srcIter.hasNext()) {
             var e = srcIter.next();
-            
futures.add(store.put(ServiceUnitStateMetadataStoreTableViewImpl.PATH_PREFIX + 
"/" + e.getKey(),
-                    writer.writeValueAsBytes(e.getValue()), 
Optional.empty()).thenApply(__ -> null));
-            if (futures.size() == MAX_CONCURRENT_SYNC_COUNT || 
!srcIter.hasNext()) {
-                FutureUtil.waitForAll(futures).get(opTimeout, 
TimeUnit.SECONDS);
-            }
+            futures.add(writeToMetadataStore(e.getKey(), e.getValue()));
+            maybeWaitCompletion(futures, !srcIter.hasNext());
+        }
+    }
+
+    private void maybeWaitCompletion(List<CompletableFuture<Void>> futures, 
boolean forceWait)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        if (!futures.isEmpty() && (futures.size() == MAX_CONCURRENT_SYNC_COUNT 
|| forceWait)) {
+            FutureUtil.waitForAll(futures)
+                    
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
+            futures.clear();
         }
     }
 
+    private @NonNull CompletableFuture<Void> writeToMetadataStore(String key, 
ServiceUnitStateData value)
+            throws JsonProcessingException {
+        return 
pulsar.getLocalMetadataStore().put(ServiceUnitStateMetadataStoreTableViewImpl.PATH_PREFIX
 + "/" + key,
+                jsonWriter.writeValueAsBytes(value), 
Optional.empty()).thenApply(__ -> null);
+    }
+
     private void syncExistingItemsToSystemTopic(ServiceUnitStateTableView src,
                                                 ServiceUnitStateTableView dst)
             throws ExecutionException, InterruptedException, TimeoutException {
-        var opTimeout = 
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
         var srcIter = src.entrySet().iterator();
         while (srcIter.hasNext()) {
             var e = srcIter.next();
             futures.add(dst.put(e.getKey(), e.getValue()));
-            if (futures.size() == MAX_CONCURRENT_SYNC_COUNT || 
!srcIter.hasNext()) {
-                FutureUtil.waitForAll(futures).get(opTimeout, 
TimeUnit.SECONDS);
-            }
+            maybeWaitCompletion(futures, !srcIter.hasNext());
         }
     }
 
     private void clean(ServiceUnitStateTableView dst)
             throws ExecutionException, InterruptedException, TimeoutException {
-        var opTimeout = 
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
         var dstIter = dst.entrySet().iterator();
         List<CompletableFuture<Void>> futures = new ArrayList<>();
         while (dstIter.hasNext()) {
             var e = dstIter.next();
             futures.add(dst.delete(e.getKey()));
-            if (futures.size() == MAX_CONCURRENT_SYNC_COUNT || 
!dstIter.hasNext()) {
-                FutureUtil.waitForAll(futures).get(opTimeout, 
TimeUnit.SECONDS);
-            }
+            maybeWaitCompletion(futures, !dstIter.hasNext());
         }
     }
 
-    private boolean waitUntilSynced(ServiceUnitStateTableView srt, 
ServiceUnitStateTableView dst, long started)
+    private boolean waitUntilSynced(ServiceUnitStateTableView src, 
ServiceUnitStateTableView dst, long started)
             throws InterruptedException {
-        while (srt.entrySet().size() != dst.entrySet().size()) {
-            if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
started)
-                    > SYNC_WAIT_TIME_IN_SECS) {
+        long lastReconciled = started;
+        while (src.entrySet().size() != dst.entrySet().size()) {
+            long now = System.currentTimeMillis();
+            if (TimeUnit.MILLISECONDS.toSeconds(now - started) > 
syncWaitTimeInSecs) {
                 return false;
             }
+            // Give in-flight syncs a grace period to settle on their own, 
then reconcile
+            // periodically: updates that raced with the table views' 
(re)start were replayed
+            // to the fresh views as existing items — which are deliberately 
not wired to
+            // sync — so without reconciliation the views would never converge.
+            if (now - lastReconciled >= RECONCILE_INTERVAL_IN_MILLIS) {
+                log.debug().attr("srcSize", 
src.entrySet().size()).attr("dstSize", dst.entrySet().size())
+                        .attr("elapsedSecs", 
TimeUnit.MILLISECONDS.toSeconds(now - started))
+                        .log("Tableviews not synced yet; reconciling");
+                reconcile(src, dst, started);
+                lastReconciled = now;
+            }
             Thread.sleep(100);
         }
         return true;
     }
 
+    /**
+     * Copies items the destination table view is missing and removes stale 
items that no longer
+     * exist in the source. Channel updates that land between the 
existing-items copy and the
+     * registration of the tail listeners are only visible as existing items 
of the freshly
+     * started views, so the tail listeners never see them. Writes flow to the 
migration source
+     * while the syncer starts, making the source view authoritative; 
destination-only items are
+     * removed only when they predate this sync phase and are still absent 
from the source, so a
+     * concurrent fresh write to the destination is never discarded. Failures 
are logged and left
+     * for the next reconcile pass. Runs on the caller's (load manager) thread 
with each batch
+     * bounded by the metadata store operation timeout.
+     */
+    private void reconcile(ServiceUnitStateTableView src, 
ServiceUnitStateTableView dst, long started)
+            throws InterruptedException {
+        // Snapshot the destination entries before iterating the source so 
that a key arriving
+        // in the destination through a concurrent tail sync cannot be 
misclassified as stale.
+        var staleDstItems = new HashMap<String, ServiceUnitStateData>();
+        for (var e : dst.entrySet()) {
+            staleDstItems.put(e.getKey(), e.getValue());
+        }
+        try {
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (var e : src.entrySet()) {
+                if (staleDstItems.remove(e.getKey()) == null) {
+                    log.info().attr("serviceUnit", e.getKey())
+                            .log("Reconciling item missing from the 
destination tableview");
+                    if (dst.isMetadataStoreBased()) {
+                        // Write directly to the store like 
syncExistingItemsToMetadataStore
+                        // does; the view's put() would conflict the item out.
+                        futures.add(writeToMetadataStore(e.getKey(), 
e.getValue()));
+                    } else {
+                        futures.add(dst.put(e.getKey(), e.getValue()));
+                    }
+                    maybeWaitCompletion(futures, false);
+                }
+            }
+            for (var e : staleDstItems.entrySet()) {
+                // Only remove items written before this sync phase began and 
re-confirmed absent
+                // from the source: a fresh destination write (e.g. from a 
broker already switched
+                // to the destination implementation) is propagated to the 
source by the tail
+                // listener instead of being deleted here.
+                if (e.getValue().timestamp() < started && src.get(e.getKey()) 
== null) {

Review Comment:
   Got it, thanks for clarifying. If this is meant to be a best-effort one-way 
migration helper, then my dst-only concern shouldn’t block this PR.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to