This is an automated email from the ASF dual-hosted git repository.

ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 123ceab  Metadata Update mechanism
123ceab is described below

commit 123ceabe0a3de46fab6eb61bff9d530b7e212100
Author: Ivan Kelly <[email protected]>
AuthorDate: Tue Aug 7 17:33:09 2018 +0200

    Metadata Update mechanism
    
    This patch introduces a metadata update mechanism for the client which
    will be used in all places where metadata is updated.
    
    The mechanism takes a bunch of predicates and functions, and runs a
    loop again the ledger manager, attempting to apply the mutation
    required as specified.
    
    It assumes that the ledger metadata objects on the client side are
    immutable and that any metadata object read reflects state that exists
    on the metadata store. This isn't the case right now, but as the
    current metadata updates are changed to use this, it will be the case.
    
    This patch also introduces a limited LedgerMetadataBuilder, though
    only the fields required for testing at mutable.
    
    Master Issue: #281
    
    Author: Ivan Kelly <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #1577 from ivankelly/ledger-fragment-immutable-metadata
---
 .../apache/bookkeeper/client/LedgerMetadata.java   |  45 +-
 .../bookkeeper/client/LedgerMetadataBuilder.java   | 115 ++++
 .../bookkeeper/client/MetadataUpdateLoop.java      | 201 +++++++
 .../bookkeeper/client/MetadataUpdateLoopTest.java  | 591 +++++++++++++++++++++
 4 files changed, 950 insertions(+), 2 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
index b63c41d..d549f7c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
@@ -80,7 +80,7 @@ public class LedgerMetadata implements 
org.apache.bookkeeper.client.api.LedgerMe
     private long length;
     private long lastEntryId;
     private long ctime;
-    private boolean storeSystemtimeAsLedgerCreationTime;
+    boolean storeSystemtimeAsLedgerCreationTime = false;
 
     private LedgerMetadataFormat.State state;
     private TreeMap<Long, ImmutableList<BookieSocketAddress>> ensembles =  new 
TreeMap<>();
@@ -130,6 +130,47 @@ public class LedgerMetadata implements 
org.apache.bookkeeper.client.api.LedgerMe
         }
     }
 
+    LedgerMetadata(int ensembleSize,
+                   int writeQuorumSize,
+                   int ackQuorumSize,
+                   LedgerMetadataFormat.State state,
+                   java.util.Optional<Long> lastEntryId,
+                   Map<Long, List<BookieSocketAddress>> ensembles,
+                   DigestType digestType,
+                   java.util.Optional<byte[]> password,
+                   java.util.Optional<Long> ctime,
+                   Map<String, byte[]> customMetadata,
+                   Version version) {
+        checkArgument(ensembles.size() > 0, "There must be at least one 
ensemble in the ledger");
+
+        this.ensembleSize = ensembleSize;
+        this.writeQuorumSize = writeQuorumSize;
+        this.ackQuorumSize = ackQuorumSize;
+        this.state = state;
+        lastEntryId.ifPresent((eid) -> this.lastEntryId = eid);
+
+        setEnsembles(ensembles);
+        if (state != LedgerMetadataFormat.State.CLOSED) {
+            currentEnsemble = this.ensembles.lastEntry().getValue();
+        }
+
+        this.digestType = digestType.equals(DigestType.MAC)
+            ? LedgerMetadataFormat.DigestType.HMAC : 
LedgerMetadataFormat.DigestType.valueOf(digestType.toString());
+
+        password.ifPresent((pw) -> {
+                this.password = pw;
+                this.hasPassword = true;
+            });
+
+        ctime.ifPresent((c) -> {
+                this.ctime = c;
+                this.storeSystemtimeAsLedgerCreationTime = true;
+            });
+
+        this.customMetadata.putAll(customMetadata);
+        this.version = version;
+    }
+
     /**
      * Used for testing purpose only.
      */
@@ -185,7 +226,7 @@ public class LedgerMetadata implements 
org.apache.bookkeeper.client.api.LedgerMe
         return ensembles;
     }
 
-    void setEnsembles(TreeMap<Long, ? extends List<BookieSocketAddress>> 
newEnsembles) {
+    void setEnsembles(Map<Long, ? extends List<BookieSocketAddress>> 
newEnsembles) {
         this.ensembles = newEnsembles.entrySet().stream()
             .collect(TreeMap::new,
                      (m, e) -> m.put(e.getKey(), 
ImmutableList.copyOf(e.getValue())),
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
new file mode 100644
index 0000000..76a7088
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
+import org.apache.bookkeeper.versioning.Version;
+
+class LedgerMetadataBuilder {
+    private int ensembleSize = 3;
+    private int writeQuorumSize = 3;
+    private int ackQuorumSize = 2;
+
+    private LedgerMetadataFormat.State state = LedgerMetadataFormat.State.OPEN;
+    private Optional<Long> lastEntryId = Optional.empty();
+
+    private Map<Long, List<BookieSocketAddress>> ensembles = new HashMap<>();
+
+    private DigestType digestType = DigestType.CRC32C;
+    private Optional<byte[]> password = Optional.empty();
+
+    private Optional<Long> ctime = Optional.empty();
+    private Map<String, byte[]> customMetadata = Collections.emptyMap();
+
+    private Version version = Version.NEW;
+
+    static LedgerMetadataBuilder create() {
+        return new LedgerMetadataBuilder();
+    }
+
+    static LedgerMetadataBuilder from(LedgerMetadata other) {
+        LedgerMetadataBuilder builder = new LedgerMetadataBuilder();
+        builder.ensembleSize = other.getEnsembleSize();
+        builder.writeQuorumSize = other.getWriteQuorumSize();
+        builder.ackQuorumSize = other.getAckQuorumSize();
+
+        builder.state = other.getState();
+
+        long lastEntryId = other.getLastEntryId();
+        if (lastEntryId != LedgerHandle.INVALID_ENTRY_ID) {
+            builder.lastEntryId = Optional.of(lastEntryId);
+        }
+
+        builder.ensembles.putAll(other.getEnsembles());
+
+        builder.digestType = other.getDigestType();
+        if (other.hasPassword()) {
+            builder.password = Optional.of(other.getPassword());
+        }
+
+        if (other.storeSystemtimeAsLedgerCreationTime) {
+            builder.ctime = Optional.of(other.getCtime());
+        }
+        builder.customMetadata = 
ImmutableMap.copyOf(other.getCustomMetadata());
+
+        builder.version = other.getVersion();
+
+        return builder;
+    }
+
+    LedgerMetadataBuilder withEnsembleSize(int ensembleSize) {
+        checkState(ensembles.size() == 0, "Can only set ensemble size before 
adding ensembles to the builder");
+        this.ensembleSize = ensembleSize;
+        return this;
+    }
+
+    LedgerMetadataBuilder withEnsembleEntry(long firstEntry, 
List<BookieSocketAddress> ensemble) {
+        checkArgument(ensemble.size() == ensembleSize,
+                      "Size of passed in ensemble must match the ensembleSize 
of the builder");
+
+        ensembles.put(firstEntry, ensemble);
+        return this;
+    }
+
+    LedgerMetadataBuilder closingAtEntry(long lastEntryId) {
+        this.lastEntryId = Optional.of(lastEntryId);
+        this.state = LedgerMetadataFormat.State.CLOSED;
+        return this;
+    }
+
+    LedgerMetadata build() {
+        return new LedgerMetadata(ensembleSize, writeQuorumSize, ackQuorumSize,
+                                  state, lastEntryId, ensembles,
+                                  digestType, password, ctime, customMetadata,
+                                  version);
+    }
+
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MetadataUpdateLoop.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MetadataUpdateLoop.java
new file mode 100644
index 0000000..9cd2afd
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MetadataUpdateLoop.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.Supplier;
+
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.versioning.Version;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mechanism to safely update the metadata of a ledger.
+ *
+ * <p>The loop takes the following steps:
+ * 1. Check if the metadata needs to be changed.
+ * 2. Make a copy of the metadata and modify it.
+ * 3. Write the modified copy to zookeeper.
+ * 3.1 If the write succeeds, go to 6.
+ * 3.2 If the write fails because of a failed compare and swap, go to 4.
+ * 4. Read the metadata back from the store
+ * 5. Update the local copy of the metadata with the metadata read in 4, go to 
1.
+ * 6. Update the local copy of the metadata with the metadata which has just 
been written.
+ *
+ * <p>All mutating operations are compare and swap operation. If the compare 
fails, another
+ * iteration of the loop begins.
+ */
+class MetadataUpdateLoop {
+    static final Logger LOG = 
LoggerFactory.getLogger(MetadataUpdateLoop.class);
+
+    private final LedgerManager lm;
+    private final long ledgerId;
+    private final Supplier<LedgerMetadata> currentLocalValue;
+    private final NeedsUpdatePredicate needsTransformation;
+    private final MetadataTransform transform;
+    private final LocalValueUpdater updateLocalValue;
+
+    private final String logContext;
+    private volatile int writeLoopCount = 0;
+    private static final AtomicIntegerFieldUpdater<MetadataUpdateLoop> 
WRITE_LOOP_COUNT_UPDATER =
+        AtomicIntegerFieldUpdater.newUpdater(MetadataUpdateLoop.class, 
"writeLoopCount");
+
+    interface NeedsUpdatePredicate {
+        boolean needsUpdate(LedgerMetadata metadata) throws Exception;
+    }
+
+    interface MetadataTransform {
+        LedgerMetadata transform(LedgerMetadata metadata) throws Exception;
+    }
+
+    interface LocalValueUpdater {
+        boolean updateValue(LedgerMetadata oldValue, LedgerMetadata newValue);
+    }
+
+    /**
+     * Construct the loop. This takes a set of functions which may be called 
multiple times
+     * during the loop.
+     *
+     * @param lm the ledger manager used for reading and writing metadata
+     * @param ledgerId the id of the ledger we will be operating on
+     * @param currentLocalValue should return the current local value of the 
metadata
+     * @param needsTransformation should return true, if the metadata needs to 
be modified.
+     *                            should throw an exception, if this update 
doesn't make sense.
+     * @param transform takes a metadata objects, transforms, and returns it, 
without modifying
+     *                  the original
+     * @param updateLocalValue if the local value matches the first parameter, 
update it to the
+     *                         second parameter and return true, return false 
otherwise
+     */
+    MetadataUpdateLoop(LedgerManager lm,
+                       long ledgerId,
+                       Supplier<LedgerMetadata> currentLocalValue,
+                       NeedsUpdatePredicate needsTransformation,
+                       MetadataTransform transform,
+                       LocalValueUpdater updateLocalValue) {
+        this.lm = lm;
+        this.ledgerId = ledgerId;
+        this.currentLocalValue = currentLocalValue;
+        this.needsTransformation = needsTransformation;
+        this.transform = transform;
+        this.updateLocalValue = updateLocalValue;
+
+        this.logContext = String.format("UpdateLoop(ledgerId=%d,loopId=%08x)",
+                                        ledgerId, 
System.identityHashCode(this));
+    }
+
+    CompletableFuture<LedgerMetadata> run() {
+        CompletableFuture<LedgerMetadata> promise = new CompletableFuture<>();
+
+        writeLoop(currentLocalValue.get(), promise);
+
+        return promise;
+    }
+
+    private void writeLoop(LedgerMetadata currentLocal, 
CompletableFuture<LedgerMetadata> promise) {
+        LOG.debug("{} starting write loop iteration, attempt {}",
+                  logContext, WRITE_LOOP_COUNT_UPDATER.incrementAndGet(this));
+        try {
+            if (needsTransformation.needsUpdate(currentLocal)) {
+                LedgerMetadata transformed = transform.transform(currentLocal);
+
+                writeToStore(ledgerId, transformed)
+                    .whenComplete((writtenMetadata, ex) -> {
+                            if (ex == null) {
+                                if (updateLocalValue.updateValue(currentLocal, 
writtenMetadata)) {
+                                    LOG.debug("{} success", logContext);
+                                    promise.complete(writtenMetadata);
+                                } else {
+                                    LOG.debug("{} local value changed while we 
were writing, try again", logContext);
+                                    writeLoop(currentLocalValue.get(), 
promise);
+                                }
+                            } else if (ex instanceof 
BKException.BKMetadataVersionException) {
+                                LOG.info("{} conflict writing metadata to 
store, update local value and try again",
+                                         logContext);
+                                
updateLocalValueFromStore(ledgerId).whenComplete((readMetadata, readEx) -> {
+                                        if (readEx == null) {
+                                            writeLoop(readMetadata, promise);
+                                        } else {
+                                            
promise.completeExceptionally(readEx);
+                                        }
+                                    });
+                            } else {
+                                LOG.error("{} Error writing metadata to 
store", logContext, ex);
+                                promise.completeExceptionally(ex);
+                            }
+                        });
+            } else {
+                LOG.debug("{} Update not needed, completing", logContext);
+                promise.complete(currentLocal);
+            }
+        } catch (Exception e) {
+            LOG.error("{} Exception updating", logContext, e);
+            promise.completeExceptionally(e);
+        }
+    }
+
+    private CompletableFuture<LedgerMetadata> updateLocalValueFromStore(long 
ledgerId) {
+        CompletableFuture<LedgerMetadata> promise = new CompletableFuture<>();
+
+        readLoop(ledgerId, promise);
+
+        return promise;
+    }
+
+    private void readLoop(long ledgerId, CompletableFuture<LedgerMetadata> 
promise) {
+        LedgerMetadata current = currentLocalValue.get();
+
+        lm.readLedgerMetadata(ledgerId,
+                              (rc, read) -> {
+                                  if (rc != BKException.Code.OK) {
+                                      LOG.error("{} Failed to read metadata 
from store, rc = {}",
+                                                logContext, rc);
+                                      
promise.completeExceptionally(BKException.create(rc));
+                                  } else if 
(current.getVersion().compare(read.getVersion())
+                                             == Version.Occurred.CONCURRENTLY) 
{
+                                      // no update needed, these are the same 
in the immutable world
+                                      promise.complete(current);
+                                  } else if 
(updateLocalValue.updateValue(current, read)) {
+                                      // updated local value successfully
+                                      promise.complete(read);
+                                  } else {
+                                      // local value changed while we were 
reading,
+                                      // look at new value, and try to read 
again
+                                      readLoop(ledgerId, promise);
+                                  }
+                              });
+    }
+
+    private CompletableFuture<LedgerMetadata> writeToStore(long ledgerId, 
LedgerMetadata toWrite) {
+        CompletableFuture<LedgerMetadata> promise = new CompletableFuture<>();
+
+        lm.writeLedgerMetadata(ledgerId, toWrite,
+                               (rc, written) -> {
+                                   if (rc != BKException.Code.OK) {
+                                       
promise.completeExceptionally(BKException.create(rc));
+                                   } else {
+                                       promise.complete(written);
+                                   }
+                               });
+        return promise;
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
new file mode 100644
index 0000000..57a3de3
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
@@ -0,0 +1,591 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+
+import org.apache.bookkeeper.test.TestCallbacks.GenericCallbackFuture;
+import org.apache.bookkeeper.versioning.LongVersion;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.zookeeper.AsyncCallback;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test metadata update loop.
+ */
+public class MetadataUpdateLoopTest {
+    static final Logger LOG = 
LoggerFactory.getLogger(MetadataUpdateLoopTest.class);
+
+    /**
+     * Test that we can update the metadata using the update loop.
+     */
+    @Test
+    public void testBasicUpdate() throws Exception {
+        try (LedgerManager lm = new MockLedgerManager()) {
+            LedgerMetadata initMeta = 
LedgerMetadataBuilder.create().withEnsembleSize(5)
+                .withEnsembleEntry(0L, Lists.newArrayList(
+                                           new 
BookieSocketAddress("0.0.0.0:3181"),
+                                           new 
BookieSocketAddress("0.0.0.1:3181"),
+                                           new 
BookieSocketAddress("0.0.0.2:3181"),
+                                           new 
BookieSocketAddress("0.0.0.3:3181"),
+                                           new 
BookieSocketAddress("0.0.0.4:3181"))).build();
+            GenericCallbackFuture<LedgerMetadata> promise = new 
GenericCallbackFuture<>();
+            long ledgerId = 1234L;
+            lm.createLedgerMetadata(ledgerId, initMeta, promise);
+            LedgerMetadata writtenMetadata = promise.get();
+
+            AtomicReference<LedgerMetadata> reference = new 
AtomicReference<>(writtenMetadata);
+
+            BookieSocketAddress newAddress = new 
BookieSocketAddress("0.0.0.5:3181");
+            MetadataUpdateLoop loop = new MetadataUpdateLoop(
+                    lm,
+                    ledgerId,
+                    reference::get,
+                    (currentMetadata) -> true,
+                    (currentMetadata) -> {
+                        List<BookieSocketAddress> ensemble = 
Lists.newArrayList(currentMetadata.getEnsemble(0L));
+                        ensemble.set(0, newAddress);
+                        return 
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, 
ensemble).build();
+                    },
+                    reference::compareAndSet);
+            loop.run().get();
+
+            Assert.assertNotEquals(reference.get(), writtenMetadata);
+            Assert.assertEquals(reference.get().getEnsemble(0L).get(0), 
newAddress);
+        }
+    }
+
+    /**
+     * Test that when 2 update loops conflict when making diffent updates to 
the metadata,
+     * both will eventually succeed, and both updates will be reflected in the 
final metadata.
+     */
+    @Test
+    public void testConflictOnWrite() throws Exception {
+        try (BlockableMockLedgerManager lm = spy(new 
BlockableMockLedgerManager())) {
+            lm.blockWrites();
+
+            long ledgerId = 1234L;
+            BookieSocketAddress b0 = new BookieSocketAddress("0.0.0.0:3181");
+            BookieSocketAddress b1 = new BookieSocketAddress("0.0.0.1:3181");
+            BookieSocketAddress b2 = new BookieSocketAddress("0.0.0.2:3181");
+            BookieSocketAddress b3 = new BookieSocketAddress("0.0.0.3:3181");
+
+            LedgerMetadata initMeta = 
LedgerMetadataBuilder.create().withEnsembleSize(2)
+                .withEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
+            GenericCallbackFuture<LedgerMetadata> promise = new 
GenericCallbackFuture<>();
+            lm.createLedgerMetadata(ledgerId, initMeta, promise);
+            LedgerMetadata writtenMetadata = promise.get();
+
+            AtomicReference<LedgerMetadata> reference1 = new 
AtomicReference<>(writtenMetadata);
+            CompletableFuture<LedgerMetadata> loop1 = new MetadataUpdateLoop(
+                    lm,
+                    ledgerId,
+                    reference1::get,
+                    (currentMetadata) -> 
currentMetadata.getEnsemble(0L).contains(b0),
+                    (currentMetadata) -> {
+                        List<BookieSocketAddress> ensemble = 
Lists.newArrayList(currentMetadata.getEnsemble(0L));
+                        ensemble.set(0, b2);
+                        return 
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, 
ensemble).build();
+                    },
+                    reference1::compareAndSet).run();
+
+            AtomicReference<LedgerMetadata> reference2 = new 
AtomicReference<>(writtenMetadata);
+            CompletableFuture<LedgerMetadata> loop2 = new MetadataUpdateLoop(
+                    lm,
+                    ledgerId,
+                    reference2::get,
+                    (currentMetadata) -> 
currentMetadata.getEnsemble(0L).contains(b1),
+                    (currentMetadata) -> {
+                        List<BookieSocketAddress> ensemble = 
Lists.newArrayList(currentMetadata.getEnsemble(0L));
+                        ensemble.set(1, b3);
+                        return 
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, 
ensemble).build();
+                    },
+                    reference2::compareAndSet).run();
+
+            lm.releaseWrites();
+
+            LedgerMetadata l1meta = loop1.get();
+            LedgerMetadata l2meta = loop2.get();
+
+            Assert.assertEquals(l1meta, reference1.get());
+            Assert.assertEquals(l2meta, reference2.get());
+
+            
Assert.assertEquals(l1meta.getVersion().compare(l2meta.getVersion()), 
Version.Occurred.BEFORE);
+
+            Assert.assertEquals(l1meta.getEnsemble(0L).get(0), b2);
+            Assert.assertEquals(l1meta.getEnsemble(0L).get(1), b1);
+
+            Assert.assertEquals(l2meta.getEnsemble(0L).get(0), b2);
+            Assert.assertEquals(l2meta.getEnsemble(0L).get(1), b3);
+
+            verify(lm, times(3)).writeLedgerMetadata(anyLong(), any(), any());
+        }
+    }
+
+    /**
+     * Test that when 2 updates loops try to make the same modification, and 
they
+     * conflict on the write to the store, the one that receives the conflict 
won't
+     * try to write again, as the value is now correct.
+     */
+    @Test
+    public void testConflictOnWriteBothWritingSame() throws Exception {
+        try (BlockableMockLedgerManager lm = spy(new 
BlockableMockLedgerManager())) {
+            lm.blockWrites();
+
+            long ledgerId = 1234L;
+            BookieSocketAddress b0 = new BookieSocketAddress("0.0.0.0:3181");
+            BookieSocketAddress b1 = new BookieSocketAddress("0.0.0.1:3181");
+            BookieSocketAddress b2 = new BookieSocketAddress("0.0.0.2:3181");
+
+            LedgerMetadata initMeta = 
LedgerMetadataBuilder.create().withEnsembleSize(2)
+                .withEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
+            GenericCallbackFuture<LedgerMetadata> promise = new 
GenericCallbackFuture<>();
+            lm.createLedgerMetadata(ledgerId, initMeta, promise);
+            LedgerMetadata writtenMetadata = promise.get();
+
+            AtomicReference<LedgerMetadata> reference = new 
AtomicReference<>(writtenMetadata);
+
+            CompletableFuture<LedgerMetadata> loop1 = new MetadataUpdateLoop(
+                    lm,
+                    ledgerId,
+                    reference::get,
+                    (currentMetadata) -> 
currentMetadata.getEnsemble(0L).contains(b0),
+                    (currentMetadata) -> {
+                        List<BookieSocketAddress> ensemble = 
Lists.newArrayList(currentMetadata.getEnsemble(0L));
+                        ensemble.set(0, b2);
+                        return 
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, 
ensemble).build();
+                    },
+                    reference::compareAndSet).run();
+            CompletableFuture<LedgerMetadata> loop2 = new MetadataUpdateLoop(
+                    lm,
+                    ledgerId,
+                    reference::get,
+                    (currentMetadata) -> 
currentMetadata.getEnsemble(0L).contains(b0),
+                    (currentMetadata) -> {
+                        List<BookieSocketAddress> ensemble = 
Lists.newArrayList(currentMetadata.getEnsemble(0L));
+                        ensemble.set(0, b2);
+                        return 
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, 
ensemble).build();
+                    },
+                    reference::compareAndSet).run();
+
+            lm.releaseWrites();
+
+            Assert.assertEquals(loop1.get(), loop2.get());
+            Assert.assertEquals(loop1.get(), reference.get());
+
+            Assert.assertEquals(reference.get().getEnsemble(0L).get(0), b2);
+            Assert.assertEquals(reference.get().getEnsemble(0L).get(1), b1);
+
+            verify(lm, times(2)).writeLedgerMetadata(anyLong(), any(), any());
+        }
+    }
+
+    /**
+     * Test that when 2 update loops both manage to write, but conflict on
+     * updating the local value.
+     */
+    @Test
+    public void testConflictOnLocalUpdate() throws Exception {
+        try (DeferCallbacksMockLedgerManager lm = spy(new 
DeferCallbacksMockLedgerManager(1))) {
+            long ledgerId = 1234L;
+            BookieSocketAddress b0 = new BookieSocketAddress("0.0.0.0:3181");
+            BookieSocketAddress b1 = new BookieSocketAddress("0.0.0.1:3181");
+            BookieSocketAddress b2 = new BookieSocketAddress("0.0.0.2:3181");
+            BookieSocketAddress b3 = new BookieSocketAddress("0.0.0.3:3181");
+
+            LedgerMetadata initMeta = 
LedgerMetadataBuilder.create().withEnsembleSize(2)
+                .withEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
+            GenericCallbackFuture<LedgerMetadata> promise = new 
GenericCallbackFuture<>();
+            lm.createLedgerMetadata(ledgerId, initMeta, promise);
+            LedgerMetadata writtenMetadata = promise.get();
+
+            AtomicReference<LedgerMetadata> reference = new 
AtomicReference<>(writtenMetadata);
+
+            CompletableFuture<LedgerMetadata> loop1 = new MetadataUpdateLoop(
+                    lm,
+                    ledgerId,
+                    reference::get,
+                    (currentMetadata) -> 
currentMetadata.getEnsemble(0L).contains(b0),
+                    (currentMetadata) -> {
+                        List<BookieSocketAddress> ensemble = 
Lists.newArrayList(currentMetadata.getEnsemble(0L));
+                        ensemble.set(0, b2);
+                        return 
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, 
ensemble).build();
+                    },
+                    reference::compareAndSet).run();
+
+            lm.waitForWriteCount(1);
+            CompletableFuture<LedgerMetadata> loop2 = new MetadataUpdateLoop(
+                    lm,
+                    ledgerId,
+                    reference::get,
+                    (currentMetadata) -> 
currentMetadata.getEnsemble(0L).contains(b1),
+                    (currentMetadata) -> {
+                        List<BookieSocketAddress> ensemble = 
Lists.newArrayList(currentMetadata.getEnsemble(0L));
+                        ensemble.set(1, b3);
+                        return 
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, 
ensemble).build();
+                    },
+                    reference::compareAndSet).run();
+            Assert.assertEquals(loop2.get(), reference.get());
+
+            lm.runDeferred();
+
+            Assert.assertEquals(loop1.get(), reference.get());
+
+            Assert.assertEquals(reference.get().getEnsemble(0L).get(0), b2);
+            Assert.assertEquals(reference.get().getEnsemble(0L).get(1), b3);
+
+            verify(lm, times(3)).writeLedgerMetadata(anyLong(), any(), any());
+        }
+    }
+
+    private static BookieSocketAddress address(String s) {
+        try {
+            return new BookieSocketAddress(s);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Hammer test. Kick off a lot of metadata updates concurrently with a 
ledger manager
+     * that runs callbacks on random threads, and validate all updates 
complete eventually,
+     * and that the final metadata reflects all the updates.
+     */
+    @Test
+    public void testHammer() throws Exception {
+        try (NonDeterministicMockLedgerManager lm = new 
NonDeterministicMockLedgerManager()) {
+            long ledgerId = 1234L;
+
+            int ensembleSize = 100;
+            List<BookieSocketAddress> initialEnsemble = IntStream.range(0, 
ensembleSize)
+                .mapToObj((i) -> address(String.format("0.0.0.%d:3181", i)))
+                .collect(Collectors.toList());
+
+            LedgerMetadata initMeta = 
LedgerMetadataBuilder.create().withEnsembleSize(ensembleSize)
+                .withEnsembleEntry(0L, initialEnsemble).build();
+            GenericCallbackFuture<LedgerMetadata> promise = new 
GenericCallbackFuture<>();
+            lm.createLedgerMetadata(ledgerId, initMeta, promise);
+            LedgerMetadata writtenMetadata = promise.get();
+
+            AtomicReference<LedgerMetadata> reference = new 
AtomicReference<>(writtenMetadata);
+
+            List<BookieSocketAddress> replacementBookies = IntStream.range(0, 
ensembleSize)
+                .mapToObj((i) -> address(String.format("0.0.%d.1:3181", i)))
+                .collect(Collectors.toList());
+
+            List<CompletableFuture<LedgerMetadata>> loops = IntStream.range(0, 
ensembleSize)
+                .mapToObj((i) -> new MetadataUpdateLoop(
+                    lm,
+                    ledgerId,
+                    reference::get,
+                    (currentMetadata) -> 
currentMetadata.getEnsemble(0L).contains(initialEnsemble.get(i)),
+                    (currentMetadata) -> {
+                        List<BookieSocketAddress> ensemble = 
Lists.newArrayList(currentMetadata.getEnsemble(0L));
+                        ensemble.set(i, replacementBookies.get(i));
+                        return 
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, 
ensemble).build();
+                    },
+                    reference::compareAndSet).run())
+                .collect(Collectors.toList());
+
+            loops.forEach((l) -> l.join());
+
+            Assert.assertEquals(reference.get().getEnsemble(0L), 
replacementBookies);
+        }
+    }
+
+    /**
+     * Test that if we have two conflicting updates, only one of the loops 
will complete.
+     * The other will throw an exception.
+     */
+    @Test
+    public void testNewestValueCannotBeUsedAfterReadBack() throws Exception {
+        try (BlockableMockLedgerManager lm = spy(new 
BlockableMockLedgerManager())) {
+            lm.blockWrites();
+
+            long ledgerId = 1234L;
+            BookieSocketAddress b0 = new BookieSocketAddress("0.0.0.0:3181");
+            BookieSocketAddress b1 = new BookieSocketAddress("0.0.0.1:3181");
+
+            LedgerMetadata initMeta = 
LedgerMetadataBuilder.create().withEnsembleSize(1)
+                .withEnsembleEntry(0L, Lists.newArrayList(b0)).build();
+            GenericCallbackFuture<LedgerMetadata> promise = new 
GenericCallbackFuture<>();
+            lm.createLedgerMetadata(ledgerId, initMeta, promise);
+            LedgerMetadata writtenMetadata = promise.get();
+
+            AtomicReference<LedgerMetadata> reference = new 
AtomicReference<>(writtenMetadata);
+            CompletableFuture<LedgerMetadata> loop1 = new MetadataUpdateLoop(
+                    lm,
+                    ledgerId,
+                    reference::get,
+                    (currentMetadata) -> !currentMetadata.isClosed(),
+                    (currentMetadata) -> 
LedgerMetadataBuilder.from(currentMetadata).closingAtEntry(10L).build(),
+                    reference::compareAndSet).run();
+            CompletableFuture<LedgerMetadata> loop2 = new MetadataUpdateLoop(
+                    lm,
+                    ledgerId,
+                    reference::get,
+                    (currentMetadata) -> {
+                        if (currentMetadata.isClosed()) {
+                            throw new BKException.BKLedgerClosedException();
+                        } else {
+                            return 
currentMetadata.getEnsemble(0L).contains(b0);
+                        }
+                    },
+                    (currentMetadata) -> {
+                        List<BookieSocketAddress> ensemble = 
Lists.newArrayList(currentMetadata.getEnsemble(0L));
+                        ensemble.set(0, b1);
+                        return 
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, 
ensemble).build();
+                    },
+                    reference::compareAndSet).run();
+            lm.releaseWrites();
+
+            LedgerMetadata l1meta = loop1.get();
+            try {
+                loop2.get();
+                Assert.fail("Update loop should have failed");
+            } catch (ExecutionException ee) {
+                Assert.assertEquals(ee.getCause().getClass(), 
BKException.BKLedgerClosedException.class);
+            }
+            Assert.assertEquals(l1meta, reference.get());
+            Assert.assertEquals(l1meta.getEnsemble(0L).get(0), b0);
+            Assert.assertTrue(l1meta.isClosed());
+
+            verify(lm, times(2)).writeLedgerMetadata(anyLong(), any(), any());
+        }
+    }
+
+    static class NonDeterministicMockLedgerManager extends MockLedgerManager {
+        final ExecutorService cbExecutor = Executors.newCachedThreadPool(
+                new 
ThreadFactoryBuilder().setNameFormat("non-deter-%d").build());
+
+        @Override
+        void executeCallback(Runnable r) {
+            cbExecutor.execute(r);
+        }
+
+        @Override
+        public void close() {
+            cbExecutor.shutdownNow();
+            super.close();
+        }
+    }
+
+    static class DeferCallbacksMockLedgerManager extends MockLedgerManager {
+        int writeCount = 0;
+        final int numToDefer;
+        List<Triple<GenericCallback<LedgerMetadata>, Integer, LedgerMetadata>> 
deferred = Lists.newArrayList();
+
+        DeferCallbacksMockLedgerManager(int numToDefer) {
+            this.numToDefer = numToDefer;
+        }
+
+        synchronized void runDeferred() {
+            deferred.forEach((d) -> 
d.getLeft().operationComplete(d.getMiddle(), d.getRight()));
+        }
+
+        synchronized void waitForWriteCount(int count) throws Exception {
+            while (writeCount < count) {
+                wait();
+            }
+        }
+
+        @Override
+        public synchronized void writeLedgerMetadata(long ledgerId, 
LedgerMetadata metadata,
+                                                     
GenericCallback<LedgerMetadata> cb) {
+            super.writeLedgerMetadata(ledgerId, metadata,
+                                      (rc, written) -> {
+                                          synchronized 
(DeferCallbacksMockLedgerManager.this) {
+                                              if (writeCount++ < numToDefer) {
+                                                  LOG.info("Added aaaaato 
deferals");
+                                                  deferred.add(Triple.of(cb, 
rc, written));
+                                              } else {
+                                                  LOG.info("Completing {}", 
numToDefer);
+                                                  cb.operationComplete(rc, 
written);
+                                              }
+                                              
DeferCallbacksMockLedgerManager.this.notifyAll();
+                                          }
+                                      });
+        };
+    }
+
+    static class BlockableMockLedgerManager extends MockLedgerManager {
+        boolean blocking = false;
+        List<Triple<Long, LedgerMetadata, GenericCallback<LedgerMetadata>>> 
reqs = Lists.newArrayList();
+
+        synchronized void blockWrites() {
+            blocking = true;
+        }
+
+        synchronized void releaseWrites() {
+            blocking = false;
+            reqs.forEach((r) -> super.writeLedgerMetadata(r.getLeft(), 
r.getMiddle(), r.getRight()));
+        }
+
+        @Override
+        public synchronized void writeLedgerMetadata(long ledgerId, 
LedgerMetadata metadata,
+                                                     
GenericCallback<LedgerMetadata> cb) {
+            if (blocking) {
+                reqs.add(Triple.of(ledgerId, metadata, cb));
+            } else {
+                super.writeLedgerMetadata(ledgerId, metadata, cb);
+            }
+        };
+    }
+
+    static class MockLedgerManager implements LedgerManager {
+        final Map<Long, Pair<LongVersion, byte[]>> metadataMap = new 
HashMap<>();
+        final ExecutorService executor = Executors.newSingleThreadExecutor((r) 
-> new Thread(r, "MockLedgerManager"));
+
+        private LedgerMetadata readMetadata(long ledgerId) throws Exception {
+            Pair<LongVersion, byte[]> pair = metadataMap.get(ledgerId);
+            if (pair == null) {
+                return null;
+            } else {
+                return LedgerMetadata.parseConfig(pair.getRight(), 
pair.getLeft(), Optional.absent());
+            }
+        }
+
+        void executeCallback(Runnable r) {
+            r.run();
+        }
+
+        @Override
+        public void createLedgerMetadata(long ledgerId, LedgerMetadata 
metadata, GenericCallback<LedgerMetadata> cb) {
+            executor.submit(() -> {
+                    if (metadataMap.containsKey(ledgerId)) {
+                        executeCallback(() -> 
cb.operationComplete(BKException.Code.LedgerExistException, null));
+                    } else {
+                        metadataMap.put(ledgerId, Pair.of(new LongVersion(0L), 
metadata.serialize()));
+                        try {
+                            LedgerMetadata readBack = readMetadata(ledgerId);
+                            executeCallback(() -> 
cb.operationComplete(BKException.Code.OK, readBack));
+                        } catch (Exception e) {
+                            LOG.error("Error reading back written metadata", 
e);
+                            executeCallback(() -> 
cb.operationComplete(BKException.Code.MetaStoreException, null));
+                        }
+                    }
+                });
+        }
+
+        @Override
+        public void removeLedgerMetadata(long ledgerId, Version version, 
GenericCallback<Void> cb) {}
+
+        @Override
+        public void readLedgerMetadata(long ledgerId, 
GenericCallback<LedgerMetadata> cb) {
+            executor.submit(() -> {
+                    try {
+                        LedgerMetadata metadata = readMetadata(ledgerId);
+                        if (metadata == null) {
+                            executeCallback(
+                                    () -> 
cb.operationComplete(BKException.Code.NoSuchLedgerExistsException, null));
+                        } else {
+                            executeCallback(() -> 
cb.operationComplete(BKException.Code.OK, metadata));
+                        }
+                    } catch (Exception e) {
+                        LOG.error("Error reading metadata", e);
+                        executeCallback(() -> 
cb.operationComplete(BKException.Code.MetaStoreException, null));
+                    }
+                });
+        }
+
+        @Override
+        public void writeLedgerMetadata(long ledgerId, LedgerMetadata 
metadata, GenericCallback<LedgerMetadata> cb) {
+            executor.submit(() -> {
+                    try {
+                        LedgerMetadata oldMetadata = readMetadata(ledgerId);
+                        if (oldMetadata == null) {
+                            executeCallback(
+                                    () -> 
cb.operationComplete(BKException.Code.NoSuchLedgerExistsException, null));
+                        } else if 
(!oldMetadata.getVersion().equals(metadata.getVersion())) {
+                            executeCallback(
+                                    () -> 
cb.operationComplete(BKException.Code.MetadataVersionException, null));
+                        } else {
+                            LongVersion oldVersion = (LongVersion) 
oldMetadata.getVersion();
+                            metadataMap.put(ledgerId, Pair.of(new 
LongVersion(oldVersion.getLongVersion() + 1),
+                                                              
metadata.serialize()));
+                            LedgerMetadata readBack = readMetadata(ledgerId);
+                            executeCallback(() -> 
cb.operationComplete(BKException.Code.OK, readBack));
+                        }
+                    } catch (Exception e) {
+                        LOG.error("Error writing metadata", e);
+                        executeCallback(
+                                () -> 
cb.operationComplete(BKException.Code.MetaStoreException, null));
+                    }
+                });
+
+        }
+
+        @Override
+        public void registerLedgerMetadataListener(long ledgerId, 
LedgerMetadataListener listener) {}
+
+        @Override
+        public void unregisterLedgerMetadataListener(long ledgerId, 
LedgerMetadataListener listener) {}
+
+        @Override
+        public void asyncProcessLedgers(Processor<Long> processor, 
AsyncCallback.VoidCallback finalCb,
+                Object context, int successRc, int failureRc) {
+        }
+
+        @Override
+        public LedgerRangeIterator getLedgerRanges() {
+            return null;
+        }
+
+        @Override
+        public void close() {
+            executor.shutdownNow();
+        }
+    }
+}

Reply via email to