This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new bf28eca  newOpenLedgerOp and ReadHandle implementation for 
MockBookKeeper
bf28eca is described below

commit bf28ecade1b92204fd97a7cfd9ad3ff885382fbf
Author: Ivan Kelly <[email protected]>
AuthorDate: Thu Apr 5 00:02:06 2018 -0700

    newOpenLedgerOp and ReadHandle implementation for MockBookKeeper
    
    This will allow users of MockBookKeeper to use the new ReadHandle API.
    
    There are two implementations of the ReadHandle interface. One pure,
    which isn't backed by any production code, and another on
    MockLedgerHandle which passes through to the pure implementation.
    
    Author: Ivan Kelly <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #1298 from ivankelly/better-mock
---
 .../org/apache/bookkeeper/client/LedgerHandle.java |   6 +
 .../org/apache/bookkeeper/client/LedgerOpenOp.java |  48 +------
 .../bookkeeper/client/impl/OpenBuilderBase.java    |  72 +++++++++++
 .../apache/bookkeeper/client/MockBookKeeper.java   |  40 ++++++
 .../apache/bookkeeper/client/MockLedgerHandle.java |  54 +++++++-
 .../apache/bookkeeper/client/MockReadHandle.java   | 140 +++++++++++++++++++++
 6 files changed, 311 insertions(+), 49 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index a9a85bc..2d34460 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -120,6 +120,12 @@ public class LedgerHandle implements WriteHandle {
      */
     public static final long INVALID_ENTRY_ID = 
BookieProtocol.INVALID_ENTRY_ID;
 
+    /**
+     * Invalid ledger id. Ledger IDs must be greater than or equal to 0.
+     * Large negative used to make it easy to spot in logs if erroneously used.
+     */
+    public static final long INVALID_LEDGER_ID = -0xABCDABCDL;
+
     final AtomicInteger blockAddCompletions = new AtomicInteger(0);
     final AtomicInteger numEnsembleChanges = new AtomicInteger(0);
     final int maxAllowedEnsembleChanges;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
index 33cfaf2..c3a69bd 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
@@ -33,8 +33,8 @@ import 
org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncOpenCallback;
-import org.apache.bookkeeper.client.api.OpenBuilder;
 import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.OpenBuilderBase;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
@@ -225,13 +225,8 @@ class LedgerOpenOp implements 
GenericCallback<LedgerMetadata> {
         cb.openComplete(rc, lh, ctx);
     }
 
-    static final class OpenBuilderImpl implements OpenBuilder {
+    static final class OpenBuilderImpl extends OpenBuilderBase {
 
-        private boolean builderRecovery = false;
-        private Long builderLedgerId;
-        private byte[] builderPassword;
-        private org.apache.bookkeeper.client.api.DigestType builderDigestType =
-            org.apache.bookkeeper.client.api.DigestType.CRC32;
         private final BookKeeper bk;
 
         OpenBuilderImpl(BookKeeper bookkeeper) {
@@ -239,30 +234,6 @@ class LedgerOpenOp implements 
GenericCallback<LedgerMetadata> {
         }
 
         @Override
-        public OpenBuilder withLedgerId(long ledgerId) {
-            this.builderLedgerId = ledgerId;
-            return this;
-        }
-
-        @Override
-        public OpenBuilder withRecovery(boolean recovery) {
-            this.builderRecovery = recovery;
-            return this;
-        }
-
-        @Override
-        public OpenBuilder withPassword(byte[] password) {
-            this.builderPassword = password;
-            return this;
-        }
-
-        @Override
-        public OpenBuilder 
withDigestType(org.apache.bookkeeper.client.api.DigestType digestType) {
-            this.builderDigestType = digestType;
-            return this;
-        }
-
-        @Override
         public CompletableFuture<ReadHandle> execute() {
             CompletableFuture<ReadHandle> future = new CompletableFuture<>();
             SyncOpenCallback callback = new SyncOpenCallback(future);
@@ -270,23 +241,14 @@ class LedgerOpenOp implements 
GenericCallback<LedgerMetadata> {
             return future;
         }
 
-        private boolean validate() {
-            if (builderLedgerId == null || builderLedgerId < 0) {
-                LOG.error("invalid ledgerId {} < 0", builderLedgerId);
-                return false;
-            }
-            return true;
-        }
-
         private void open(OpenCallback cb) {
-
             if (!validate()) {
                 cb.openComplete(BKException.Code.NoSuchLedgerExistsException, 
null, null);
                 return;
             }
 
-            LedgerOpenOp op = new LedgerOpenOp(bk, builderLedgerId, 
fromApiDigestType(builderDigestType),
-                builderPassword, cb, null);
+            LedgerOpenOp op = new LedgerOpenOp(bk, ledgerId, 
fromApiDigestType(digestType),
+                                               password, cb, null);
             ReentrantReadWriteLock closeLock = bk.getCloseLock();
             closeLock.readLock().lock();
             try {
@@ -294,7 +256,7 @@ class LedgerOpenOp implements 
GenericCallback<LedgerMetadata> {
                     cb.openComplete(BKException.Code.ClientClosedException, 
null, null);
                     return;
                 }
-                if (builderRecovery) {
+                if (recovery) {
                     op.initiate();
                 } else {
                     op.initiateWithoutRecovery();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/OpenBuilderBase.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/OpenBuilderBase.java
new file mode 100644
index 0000000..b22effc
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/OpenBuilderBase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.client.impl;
+
+import java.util.Arrays;
+
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.OpenBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for open builders which does the mundane builder stuff.
+ */
+public abstract class OpenBuilderBase implements OpenBuilder {
+    static final Logger LOG = LoggerFactory.getLogger(OpenBuilderBase.class);
+
+    protected boolean recovery = false;
+    protected long ledgerId = LedgerHandle.INVALID_LEDGER_ID;
+    protected byte[] password;
+    protected DigestType digestType = DigestType.CRC32;
+
+    @Override
+    public OpenBuilder withLedgerId(long ledgerId) {
+        this.ledgerId = ledgerId;
+        return this;
+    }
+
+    @Override
+    public OpenBuilder withRecovery(boolean recovery) {
+        this.recovery = recovery;
+        return this;
+    }
+
+    @Override
+    public OpenBuilder withPassword(byte[] password) {
+        this.password = Arrays.copyOf(password, password.length);
+        return this;
+    }
+
+    @Override
+    public OpenBuilder withDigestType(DigestType digestType) {
+        this.digestType = digestType;
+        return this;
+    }
+
+    protected boolean validate() {
+        if (ledgerId < 0) {
+            LOG.error("invalid ledgerId {} < 0", ledgerId);
+            return false;
+        }
+        return true;
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
index c642e57..3b63cfd 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -34,6 +35,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
+import org.apache.bookkeeper.client.api.OpenBuilder;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.OpenBuilderBase;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
@@ -210,6 +214,42 @@ public class MockBookKeeper extends BookKeeper {
         shutdown();
     }
 
+    @Override
+    public OpenBuilder newOpenLedgerOp() {
+        return new OpenBuilderBase() {
+            @Override
+            public CompletableFuture<ReadHandle> execute() {
+                CompletableFuture<ReadHandle> promise = new 
CompletableFuture<ReadHandle>();
+
+                if (!validate()) {
+                    promise.completeExceptionally(new 
BKException.BKNoSuchLedgerExistsException());
+                    return promise;
+                } else if (getProgrammedFailStatus()) {
+                    if (failReturnCode != BkTimeoutOperation) {
+                        
promise.completeExceptionally(BKException.create(failReturnCode));
+                    }
+                    return promise;
+                } else if (stopped.get()) {
+                    promise.completeExceptionally(new 
BKException.BKClientClosedException());
+                    return promise;
+                }
+
+                MockLedgerHandle lh = ledgers.get(ledgerId);
+                if (lh == null) {
+                    promise.completeExceptionally(new 
BKException.BKNoSuchLedgerExistsException());
+                } else if (lh.digest != 
DigestType.fromApiDigestType(digestType)) {
+                    promise.completeExceptionally(new 
BKException.BKDigestMatchException());
+                } else if (!Arrays.equals(lh.passwd, password)) {
+                    promise.completeExceptionally(new 
BKException.BKUnauthorizedAccessException());
+                } else {
+                    promise.complete(new MockReadHandle(MockBookKeeper.this, 
ledgerId,
+                                                        
lh.getLedgerMetadata(), lh.entries));
+                }
+                return promise;
+            }
+        };
+    }
+
     public void shutdown() {
         try {
             super.close();
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
index c914b10..6b4bea4 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
@@ -30,13 +30,18 @@ import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.Enumeration;
 import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,11 +50,12 @@ import org.slf4j.LoggerFactory;
  */
 public class MockLedgerHandle extends LedgerHandle {
 
-    final ArrayList<LedgerEntry> entries = Lists.newArrayList();
+    final ArrayList<LedgerEntryImpl> entries = Lists.newArrayList();
     final MockBookKeeper bk;
     final long id;
     final DigestType digest;
     final byte[] passwd;
+    final ReadHandle readHandle;
     long lastEntry = -1;
     boolean fenced = false;
 
@@ -60,6 +66,8 @@ public class MockLedgerHandle extends LedgerHandle {
         this.id = id;
         this.digest = digest;
         this.passwd = Arrays.copyOf(passwd, passwd.length);
+
+        readHandle = new MockReadHandle(bk, id, getLedgerMetadata(), entries);
     }
 
     @Override
@@ -100,7 +108,7 @@ public class MockLedgerHandle extends LedgerHandle {
                 final Queue<LedgerEntry> seq = new ArrayDeque<LedgerEntry>();
                 long entryId = firstEntry;
                 while (entryId <= lastEntry && entryId < entries.size()) {
-                    seq.add(entries.get((int) entryId++));
+                    seq.add(new LedgerEntry(entries.get((int) entryId++)));
                 }
 
                 log.debug("Entries read: {}", seq);
@@ -142,7 +150,7 @@ public class MockLedgerHandle extends LedgerHandle {
         }
 
         lastEntry = entries.size();
-        entries.add(new MockLedgerEntry(ledgerId, lastEntry, data));
+        entries.add(LedgerEntryImpl.create(ledgerId, lastEntry, data.length, 
Unpooled.wrappedBuffer(data)));
         return lastEntry;
     }
 
@@ -192,8 +200,8 @@ public class MockLedgerHandle extends LedgerHandle {
                     lastEntry = entries.size();
                     byte[] storedData = new byte[data.readableBytes()];
                     data.readBytes(storedData);
-                    LedgerEntry entry = new MockLedgerEntry(ledgerId, 
lastEntry, storedData);
-                    entries.add(entry);
+                    entries.add(LedgerEntryImpl.create(ledgerId, lastEntry,
+                                                       storedData.length, 
Unpooled.wrappedBuffer(storedData)));
                     data.release();
                     cb.addComplete(0, MockLedgerHandle.this, lastEntry, ctx);
                 }
@@ -214,13 +222,47 @@ public class MockLedgerHandle extends LedgerHandle {
     @Override
     public long getLength() {
         long length = 0;
-        for (LedgerEntry entry : entries) {
+        for (LedgerEntryImpl entry : entries) {
             length += entry.getLength();
         }
 
         return length;
     }
 
+
+    // ReadHandle interface
+    @Override
+    public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long 
lastEntry) {
+        return readHandle.readAsync(firstEntry, lastEntry);
+    }
+
+    @Override
+    public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long 
firstEntry, long lastEntry) {
+        return readHandle.readUnconfirmedAsync(firstEntry, lastEntry);
+    }
+
+    @Override
+    public CompletableFuture<Long> readLastAddConfirmedAsync() {
+        return readHandle.readLastAddConfirmedAsync();
+    }
+
+    @Override
+    public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
+        return readHandle.tryReadLastAddConfirmedAsync();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return readHandle.isClosed();
+    }
+
+    @Override
+    public CompletableFuture<LastConfirmedAndEntry> 
readLastAddConfirmedAndEntryAsync(long entryId,
+                                                                               
       long timeOutInMillis,
+                                                                               
       boolean parallel) {
+        return readHandle.readLastAddConfirmedAndEntryAsync(entryId, 
timeOutInMillis, parallel);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(MockLedgerHandle.class);
 
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockReadHandle.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockReadHandle.java
new file mode 100644
index 0000000..e3e3ffe
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockReadHandle.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+
+
+/**
+ * Mock implementation of ReadHandle.
+ */
+@Slf4j
+class MockReadHandle implements ReadHandle {
+    private final MockBookKeeper bk;
+    private final long ledgerId;
+    private final LedgerMetadata metadata;
+    private final List<LedgerEntryImpl> entries;
+
+    MockReadHandle(MockBookKeeper bk, long ledgerId, LedgerMetadata metadata, 
List<LedgerEntryImpl> entries) {
+        this.bk = bk;
+        this.ledgerId = ledgerId;
+        this.metadata = metadata;
+        this.entries = entries;
+    }
+
+    @Override
+    public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long 
lastEntry) {
+        CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
+        if (bk.isStopped()) {
+            promise.completeExceptionally(new 
BKException.BKClientClosedException());
+            return promise;
+        }
+
+        bk.executor.execute(() -> {
+                if (bk.getProgrammedFailStatus()) {
+                    
promise.completeExceptionally(BKException.create(bk.failReturnCode));
+                    return;
+                } else if (bk.isStopped()) {
+                    promise.completeExceptionally(new 
BKException.BKClientClosedException());
+                    return;
+                }
+
+                log.debug("readEntries: first={} last={} total={}", 
firstEntry, lastEntry, entries.size());
+                List<LedgerEntry> seq = new ArrayList<>();
+                long entryId = firstEntry;
+                while (entryId <= lastEntry && entryId < entries.size()) {
+                    seq.add(entries.get((int) entryId++).duplicate());
+                }
+                log.debug("Entries read: {}", seq);
+                promise.complete(LedgerEntriesImpl.create(seq));
+            });
+        return promise;
+
+    }
+
+    @Override
+    public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long 
firstEntry, long lastEntry) {
+        return readAsync(firstEntry, lastEntry);
+    }
+
+    @Override
+    public CompletableFuture<Long> readLastAddConfirmedAsync() {
+        return CompletableFuture.completedFuture(getLastAddConfirmed());
+    }
+
+    @Override
+    public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
+        return readLastAddConfirmedAsync();
+    }
+
+    @Override
+    public long getLastAddConfirmed() {
+        return entries.get(entries.size() - 1).getEntryId();
+    }
+
+    @Override
+    public long getLength() {
+        long length = 0;
+        for (LedgerEntryImpl entry : entries) {
+            length += entry.getLength();
+        }
+
+        return length;
+    }
+
+    @Override
+    public boolean isClosed() {
+        return metadata.isClosed();
+    }
+
+    @Override
+    public CompletableFuture<LastConfirmedAndEntry> 
readLastAddConfirmedAndEntryAsync(long entryId,
+                                                                               
       long timeOutInMillis,
+                                                                               
       boolean parallel) {
+        CompletableFuture<LastConfirmedAndEntry> promise = new 
CompletableFuture<>();
+        promise.completeExceptionally(new UnsupportedOperationException("Long 
poll not implemented"));
+        return promise;
+    }
+
+    // Handle interface
+    @Override
+    public long getId() {
+        return ledgerId;
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public LedgerMetadata getLedgerMetadata() {
+        return metadata;
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to