sijie commented on a change in pull request #1577: Metadata Update mechanism
URL: https://github.com/apache/bookkeeper/pull/1577#discussion_r207793427
 
 

 ##########
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MetadataUpdateLoop.java
 ##########
 @@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import org.apache.bookkeeper.meta.LedgerManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mechanism to safely update the metadata of a ledger.
+ *
+ * <p>The loop takes the following steps:
+ * 1. Check if the metadata needs to be changed.
+ * 2. Make a copy of the metadata and modify it.
+ * 3. Write the modified copy to zookeeper.
+ * 3.1 If the write succeeds, go to 6.
+ * 3.2 If the write fails because of a failed compare and swap, go to 4.
+ * 4. Read the metadata back from the store
+ * 5. Update the local copy of the metadata with the metadata read in 4, go to 
1.
+ * 6. Update the local copy of the metadata with the metadata which has just 
been written.
+ *
+ * <p>All mutating operations are compare and swap operation. If the compare 
fails, another
+ * iteration of the loop begins.
+ */
+class MetadataUpdateLoop {
+    static final Logger LOG = 
LoggerFactory.getLogger(MetadataUpdateLoop.class);
+
+    private final LedgerManager lm;
+    private final long ledgerId;
+    private final Supplier<LedgerMetadata> currentLocalValue;
+    private final NeedsUpdatePredicate needsTransformation;
+    private final Function<LedgerMetadata, LedgerMetadata> transform;
+    private final BiPredicate<LedgerMetadata, LedgerMetadata> updateLocalValue;
+
+    private final String logContext;
+    private final AtomicInteger writeLoopCounter = new AtomicInteger(0);
+
+    interface NeedsUpdatePredicate {
+        boolean needsUpdate(LedgerMetadata metadata) throws Exception;
+    }
+
+    /**
+     * Construct the loop. This takes a set of functions which may be called 
multiple times
+     * during the loop.
+     *
+     * @param currentLocalValue should return the current local value of the 
metadata.
+     * @param needsTransformation should return true, if the metadata needs to 
be modified.
+     *                            should throw an exception, if this update 
doesn't make sense.
+     * @param transform takes a metadata objects, transforms, and returns it, 
without modifying
+     *                  the original.
+     * @param updateLocalValue if the local value matches the first parameter, 
update it to the
+     *                         second parameter and return true, return false 
otherwise.
+     */
+    MetadataUpdateLoop(LedgerManager lm,
+                       long ledgerId,
+                       Supplier<LedgerMetadata> currentLocalValue,
+                       NeedsUpdatePredicate needsTransformation,
+                       Function<LedgerMetadata, LedgerMetadata> transform,
+                       BiPredicate<LedgerMetadata, LedgerMetadata> 
updateLocalValue) {
+        this.lm = lm;
+        this.ledgerId = ledgerId;
+        this.currentLocalValue = currentLocalValue;
+        this.needsTransformation = needsTransformation;
+        this.transform = transform;
+        this.updateLocalValue = updateLocalValue;
+
+        this.logContext = String.format("UpdateLoop(ledgerId=%d,loopId=%08x)",
+                                        ledgerId, 
System.identityHashCode(this));
+    }
+
+    CompletableFuture<LedgerMetadata> run() {
+        CompletableFuture<LedgerMetadata> promise = new CompletableFuture<>();
+
+        writeLoop(currentLocalValue.get(), promise);
+
+        return promise;
+    }
+
+    private void writeLoop(LedgerMetadata currentLocal, 
CompletableFuture<LedgerMetadata> promise) {
+        LOG.info("{} starting write loop iteration, attempt {}", logContext, 
writeLoopCounter.incrementAndGet());
+        try {
+            if (needsTransformation.needsUpdate(currentLocal)) {
+                LedgerMetadata transformed = transform.apply(currentLocal);
+
+                writeToStore(ledgerId, transformed)
+                    .whenComplete((writtenMetadata, ex) -> {
+                            if (ex == null) {
+                                if (updateLocalValue.test(currentLocal, 
writtenMetadata)) {
+                                    LOG.info("{} success", logContext);
+                                    promise.complete(writtenMetadata);
+                                } else {
+                                    LOG.info("{} local value changed while we 
were writing, try again", logContext);
+                                    writeLoop(currentLocalValue.get(), 
promise);
+                                }
+                            } else if (ex instanceof 
BKException.BKMetadataVersionException) {
+                                LOG.info("{} conflict writing metadata to 
store, update local value and try again",
+                                         logContext);
+                                
updateFromStore(ledgerId).whenComplete((readMetadata, readEx) -> {
+                                        if (readEx == null) {
+                                            writeLoop(readMetadata, promise);
+                                        } else {
+                                            LOG.info("{} error updating local 
value", logContext);
 
 Review comment:
   error. 
   
   and the logging is ambiguous. because the failure can be failing read from 
metadata store not updating local value.
   
   so I would suggest not logging here, `updateFromStore` should do the logging.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to