This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 4897e43 Added MockBookKeeper for unit tests
4897e43 is described below
commit 4897e43ca6b58f38f5bad7a04ba4b881b7c7551c
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Feb 27 01:29:34 2018 -0800
Added MockBookKeeper for unit tests
This mock class is what we use in Pulsar to do unit tests for the
managed-ledger library.
This approach might be useful in general, in all cases when creating unit
tests of an application that uses the BookKeeper API.
The `MockBookKeeper` behaves like the regular BK client, except it needs no
bookies or zookkeeper. All ledgers and data are just kept in an hashmap in
memory.
It is possible to inject failure in the sync/async calls, even after N
steps. This is very useful to trigger failure in complex operations and cover
this paths in unit tests.
Author: Matteo Merli <[email protected]>
Reviewers: Sijie Guo <[email protected]>
This closes #1209 from merlimat/mock-bookkeeper
---
.../org/apache/bookkeeper/client/BookKeeper.java | 31 +++
.../apache/bookkeeper/client/MockBookKeeper.java | 280 +++++++++++++++++++++
.../bookkeeper/client/MockBookKeeperTest.java | 54 ++++
.../apache/bookkeeper/client/MockLedgerEntry.java | 73 ++++++
.../apache/bookkeeper/client/MockLedgerHandle.java | 226 +++++++++++++++++
5 files changed, 664 insertions(+)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index f0fff6e..a94f206 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -540,6 +540,37 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
scheduleBookieHealthCheckIfEnabled();
}
+ /**
+ * Allow to extend BookKeeper for mocking in unit tests.
+ */
+ @VisibleForTesting
+ BookKeeper() {
+ statsLogger = NullStatsLogger.INSTANCE;
+ scheduler = null;
+ requestTimer = null;
+ reorderReadSequence = false;
+ regClient = null;
+ readSpeculativeRequestPolicy = Optional.absent();
+ readLACSpeculativeRequestPolicy = Optional.absent();
+ placementPolicy = null;
+ ownTimer = false;
+ mainWorkerPool = null;
+ ledgerManagerFactory = null;
+ ledgerManager = null;
+ ledgerIdGenerator = null;
+ featureProvider = null;
+ explicitLacInterval = 0;
+ eventLoopGroup = null;
+ disableEnsembleChangeFeature = null;
+ delayEnsembleChange = false;
+ conf = new ClientConfiguration();
+ bookieWatcher = null;
+ bookieInfoScheduler = null;
+ bookieClient = null;
+ addEntryQuorumTimeoutNanos = 0;
+ }
+
+
public int getExplicitLacInterval() {
return explicitLacInterval;
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
new file mode 100644
index 0000000..c642e57
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
+import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
+import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mocked version of BookKeeper client that keeps all ledgers data in memory.
+ *
+ * <p>This mocked client is meant to be used in unit tests for applications
using the BookKeeper API.
+ */
+public class MockBookKeeper extends BookKeeper {
+
+ final ExecutorService executor = Executors.newFixedThreadPool(1, new
DefaultThreadFactory("mock-bookkeeper"));
+ final ZooKeeper zkc;
+
+ @Override
+ public ZooKeeper getZkHandle() {
+ return zkc;
+ }
+
+ @Override
+ public ClientConfiguration getConf() {
+ return super.getConf();
+ }
+
+ Map<Long, MockLedgerHandle> ledgers = new ConcurrentHashMap<Long,
MockLedgerHandle>();
+ AtomicLong sequence = new AtomicLong(3);
+ AtomicBoolean stopped = new AtomicBoolean(false);
+ AtomicInteger stepsToFail = new AtomicInteger(-1);
+ int failReturnCode = BKException.Code.OK;
+ int nextFailReturnCode = BKException.Code.OK;
+
+ public MockBookKeeper(ZooKeeper zkc) throws Exception {
+ this.zkc = zkc;
+ }
+
+ @Override
+ public LedgerHandle createLedger(DigestType digestType, byte passwd[])
throws BKException {
+ return createLedger(3, 2, digestType, passwd);
+ }
+
+ @Override
+ public LedgerHandle createLedger(int ensSize, int qSize, DigestType
digestType, byte passwd[]) throws BKException {
+ return createLedger(ensSize, qSize, qSize, digestType, passwd);
+ }
+
+ @Override
+ public void asyncCreateLedger(int ensSize, int writeQuorumSize, int
ackQuorumSize, final DigestType digestType,
+ final byte[] passwd, final CreateCallback cb, final Object ctx,
Map<String, byte[]> properties) {
+ if (stopped.get()) {
+ cb.createComplete(BKException.Code.WriteException, null, ctx);
+ return;
+ }
+
+ executor.execute(new Runnable() {
+ public void run() {
+ if (getProgrammedFailStatus()) {
+ if (failReturnCode != BkTimeoutOperation) {
+ cb.createComplete(failReturnCode, null, ctx);
+ }
+ return;
+ }
+
+ if (stopped.get()) {
+ cb.createComplete(BKException.Code.WriteException, null,
ctx);
+ return;
+ }
+
+ try {
+ long id = sequence.getAndIncrement();
+ log.info("Creating ledger {}", id);
+ MockLedgerHandle lh = new
MockLedgerHandle(MockBookKeeper.this, id, digestType, passwd);
+ ledgers.put(id, lh);
+ cb.createComplete(0, lh, ctx);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+ });
+ }
+
+ @Override
+ public LedgerHandle createLedger(int ensSize, int writeQuorumSize, int
ackQuorumSize, DigestType digestType,
+ byte[] passwd) throws BKException {
+ checkProgrammedFail();
+
+ if (stopped.get()) {
+ throw BKException.create(BKException.Code.WriteException);
+ }
+
+ try {
+ long id = sequence.getAndIncrement();
+ log.info("Creating ledger {}", id);
+ MockLedgerHandle lh = new MockLedgerHandle(this, id, digestType,
passwd);
+ ledgers.put(id, lh);
+ return lh;
+ } catch (Throwable t) {
+ log.error("Exception:", t);
+ return null;
+ }
+ }
+
+ @Override
+ public void asyncCreateLedger(int ensSize, int qSize, DigestType
digestType, byte[] passwd, CreateCallback cb,
+ Object ctx) {
+ asyncCreateLedger(ensSize, qSize, qSize, digestType, passwd, cb, ctx,
Collections.emptyMap());
+ }
+
+ @Override
+ public void asyncOpenLedger(long lId, DigestType digestType, byte[]
passwd, OpenCallback cb, Object ctx) {
+ if (getProgrammedFailStatus()) {
+ if (failReturnCode != BkTimeoutOperation) {
+ cb.openComplete(failReturnCode, null, ctx);
+ }
+ return;
+ }
+
+ if (stopped.get()) {
+ cb.openComplete(BKException.Code.WriteException, null, ctx);
+ return;
+ }
+
+ MockLedgerHandle lh = ledgers.get(lId);
+ if (lh == null) {
+ cb.openComplete(BKException.Code.NoSuchLedgerExistsException,
null, ctx);
+ } else if (lh.digest != digestType) {
+ cb.openComplete(BKException.Code.DigestMatchException, null, ctx);
+ } else if (!Arrays.equals(lh.passwd, passwd)) {
+ cb.openComplete(BKException.Code.UnauthorizedAccessException,
null, ctx);
+ } else {
+ cb.openComplete(0, lh, ctx);
+ }
+ }
+
+ @Override
+ public void asyncOpenLedgerNoRecovery(long lId, DigestType digestType,
byte[] passwd, OpenCallback cb, Object ctx) {
+ asyncOpenLedger(lId, digestType, passwd, cb, ctx);
+ }
+
+ @Override
+ public void asyncDeleteLedger(long lId, DeleteCallback cb, Object ctx) {
+ if (getProgrammedFailStatus()) {
+ if (failReturnCode != BkTimeoutOperation) {
+ cb.deleteComplete(failReturnCode, ctx);
+ }
+ } else if (stopped.get()) {
+ cb.deleteComplete(BKException.Code.WriteException, ctx);
+ } else if (ledgers.containsKey(lId)) {
+ ledgers.remove(lId);
+ cb.deleteComplete(0, ctx);
+ } else {
+ cb.deleteComplete(BKException.Code.NoSuchLedgerExistsException,
ctx);
+ }
+ }
+
+ @Override
+ public void deleteLedger(long lId) throws InterruptedException,
BKException {
+ checkProgrammedFail();
+
+ if (stopped.get()) {
+ throw BKException.create(BKException.Code.WriteException);
+ }
+
+ if (!ledgers.containsKey(lId)) {
+ throw
BKException.create(BKException.Code.NoSuchLedgerExistsException);
+ }
+
+ ledgers.remove(lId);
+ }
+
+ @Override
+ public void close() throws InterruptedException, BKException {
+ checkProgrammedFail();
+ shutdown();
+ }
+
+ public void shutdown() {
+ try {
+ super.close();
+ } catch (Exception e) {
+ }
+ stopped.set(true);
+ for (MockLedgerHandle ledger : ledgers.values()) {
+ ledger.entries.clear();
+ }
+
+ ledgers.clear();
+ executor.shutdownNow();
+ }
+
+ public boolean isStopped() {
+ return stopped.get();
+ }
+
+ public Set<Long> getLedgers() {
+ return ledgers.keySet();
+ }
+
+ void checkProgrammedFail() throws BKException {
+ int steps = stepsToFail.getAndDecrement();
+ log.debug("Steps to fail: {}", steps);
+ if (steps <= 0) {
+ if (failReturnCode != BKException.Code.OK) {
+ int rc = failReturnCode;
+ failReturnCode = nextFailReturnCode;
+ nextFailReturnCode = BKException.Code.OK;
+ throw BKException.create(rc);
+ }
+ }
+ }
+
+ boolean getProgrammedFailStatus() {
+ int steps = stepsToFail.getAndDecrement();
+ log.debug("Steps to fail: {}", steps);
+ return steps == 0;
+ }
+
+ public void failNow(int rc) {
+ failNow(rc, BKException.Code.OK);
+ }
+
+ public void failNow(int rc, int nextErrorCode) {
+ failAfter(0, rc);
+ }
+
+ public void failAfter(int steps, int rc) {
+ failAfter(steps, rc, BKException.Code.OK);
+ }
+
+ public void failAfter(int steps, int rc, int nextErrorCode) {
+ stepsToFail.set(steps);
+ failReturnCode = rc;
+ this.nextFailReturnCode = nextErrorCode;
+ }
+
+ public void timeoutAfter(int steps) {
+ stepsToFail.set(steps);
+ failReturnCode = BkTimeoutOperation;
+ }
+
+ private static final int BkTimeoutOperation = 1000;
+
+ private static final Logger log =
LoggerFactory.getLogger(MockBookKeeper.class);
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTest.java
new file mode 100644
index 0000000..85a654c
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Enumeration;
+
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.junit.Test;
+
+/**
+ * Test the mocked BookKeeper client.
+ */
+public class MockBookKeeperTest {
+
+ @Test
+ public void testMockedBookKeeper() throws Exception {
+ BookKeeper bkc = new MockBookKeeper(null);
+
+ LedgerHandle lh = bkc.createLedger(DigestType.CRC32, new byte[0]);
+
+ assertEquals(0, lh.addEntry("entry-0".getBytes()));
+ assertEquals(1, lh.addEntry("entry-1".getBytes()));
+
+ assertEquals(1, lh.getLastAddConfirmed());
+
+ Enumeration<LedgerEntry> entries = lh.readEntries(0, 1);
+ assertTrue(entries.hasMoreElements());
+ assertEquals("entry-0", new String(entries.nextElement().getEntry()));
+ assertTrue(entries.hasMoreElements());
+ assertEquals("entry-1", new String(entries.nextElement().getEntry()));
+ assertFalse(entries.hasMoreElements());
+
+ lh.close();
+ bkc.close();
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerEntry.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerEntry.java
new file mode 100644
index 0000000..10c25d5
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerEntry.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.InputStream;
+
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+
+/**
+ * Mocked BK {@link LedgerEntry}. Used by {@link MockLedgerHandle}.
+ */
+public class MockLedgerEntry extends LedgerEntry {
+
+ final long ledgerId;
+ final long entryId;
+ final byte[] data;
+
+ public MockLedgerEntry(long ledgerId, long entryId, byte[] data) {
+ super(LedgerEntryImpl.create(ledgerId, entryId, data.length,
Unpooled.wrappedBuffer(data)));
+ this.ledgerId = ledgerId;
+ this.entryId = entryId;
+ this.data = data;
+ }
+
+ @Override
+ public long getLedgerId() {
+ return ledgerId;
+ }
+
+ @Override
+ public long getEntryId() {
+ return entryId;
+ }
+
+ @Override
+ public long getLength() {
+ return data.length;
+ }
+
+ @Override
+ public byte[] getEntry() {
+ return data;
+ }
+
+ @Override
+ public ByteBuf getEntryBuffer() {
+ return Unpooled.wrappedBuffer(data);
+ }
+
+ @Override
+ public InputStream getEntryInputStream() {
+ return null;
+ }
+
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
new file mode 100644
index 0000000..c914b10
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import com.google.common.collect.Lists;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.security.GeneralSecurityException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.Enumeration;
+import java.util.Queue;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.WriteFlag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mock BK {@link LedgerHandle}. Used by {@link MockBookKeeper}.
+ */
+public class MockLedgerHandle extends LedgerHandle {
+
+ final ArrayList<LedgerEntry> entries = Lists.newArrayList();
+ final MockBookKeeper bk;
+ final long id;
+ final DigestType digest;
+ final byte[] passwd;
+ long lastEntry = -1;
+ boolean fenced = false;
+
+ MockLedgerHandle(MockBookKeeper bk, long id, DigestType digest, byte[]
passwd) throws GeneralSecurityException {
+ super(bk, id, new LedgerMetadata(3, 3, 2, DigestType.MAC,
"".getBytes()), DigestType.MAC, "".getBytes(),
+ EnumSet.noneOf(WriteFlag.class));
+ this.bk = bk;
+ this.id = id;
+ this.digest = digest;
+ this.passwd = Arrays.copyOf(passwd, passwd.length);
+ }
+
+ @Override
+ public void asyncClose(CloseCallback cb, Object ctx) {
+ if (bk.getProgrammedFailStatus()) {
+ cb.closeComplete(bk.failReturnCode, this, ctx);
+ return;
+ }
+
+ fenced = true;
+ try {
+ bk.executor.execute(() -> cb.closeComplete(0, this, ctx));
+ } catch (RejectedExecutionException e) {
+ cb.closeComplete(0, this, ctx);
+ }
+
+ }
+
+ @Override
+ public void asyncReadEntries(final long firstEntry, final long lastEntry,
final ReadCallback cb, final Object ctx) {
+ if (bk.isStopped()) {
+ cb.readComplete(-1, MockLedgerHandle.this, null, ctx);
+ return;
+ }
+
+ bk.executor.execute(new Runnable() {
+ public void run() {
+ if (bk.getProgrammedFailStatus()) {
+ cb.readComplete(bk.failReturnCode, MockLedgerHandle.this,
null, ctx);
+ return;
+ } else if (bk.isStopped()) {
+ log.debug("Bookkeeper is closed!");
+ cb.readComplete(-1, MockLedgerHandle.this, null, ctx);
+ return;
+ }
+
+ log.debug("readEntries: first={} last={} total={}",
firstEntry, lastEntry, entries.size());
+ final Queue<LedgerEntry> seq = new ArrayDeque<LedgerEntry>();
+ long entryId = firstEntry;
+ while (entryId <= lastEntry && entryId < entries.size()) {
+ seq.add(entries.get((int) entryId++));
+ }
+
+ log.debug("Entries read: {}", seq);
+
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ }
+
+ cb.readComplete(0, MockLedgerHandle.this, new
Enumeration<LedgerEntry>() {
+ public boolean hasMoreElements() {
+ return !seq.isEmpty();
+ }
+
+ public LedgerEntry nextElement() {
+ return seq.remove();
+ }
+
+ }, ctx);
+ }
+ });
+ }
+
+ @Override
+ public long addEntry(byte[] data) throws InterruptedException, BKException
{
+ try {
+ bk.checkProgrammedFail();
+ } catch (BKException e) {
+ fenced = true;
+ throw e;
+ }
+
+ if (fenced) {
+ throw BKException.create(BKException.Code.LedgerFencedException);
+ }
+
+ if (bk.isStopped()) {
+ throw
BKException.create(BKException.Code.NoBookieAvailableException);
+ }
+
+ lastEntry = entries.size();
+ entries.add(new MockLedgerEntry(ledgerId, lastEntry, data));
+ return lastEntry;
+ }
+
+ @Override
+ public void asyncAddEntry(final byte[] data, final AddCallback cb, final
Object ctx) {
+ asyncAddEntry(data, 0, data.length, cb, ctx);
+ }
+
+ @Override
+ public void asyncAddEntry(final byte[] data, final int offset, final int
length, final AddCallback cb,
+ final Object ctx) {
+ asyncAddEntry(Unpooled.wrappedBuffer(data, offset, length), cb, ctx);
+ }
+
+ @Override
+ public void asyncAddEntry(final ByteBuf data, final AddCallback cb, final
Object ctx) {
+ if (bk.isStopped()) {
+ cb.addComplete(-1, MockLedgerHandle.this, INVALID_ENTRY_ID, ctx);
+ return;
+ }
+
+ data.retain();
+ bk.executor.execute(new Runnable() {
+ public void run() {
+ if (bk.getProgrammedFailStatus()) {
+ fenced = true;
+ data.release();
+ cb.addComplete(bk.failReturnCode, MockLedgerHandle.this,
INVALID_ENTRY_ID, ctx);
+ return;
+ }
+ if (bk.isStopped()) {
+ data.release();
+ cb.addComplete(-1, MockLedgerHandle.this,
INVALID_ENTRY_ID, ctx);
+ return;
+ }
+
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ }
+
+ if (fenced) {
+ data.release();
+ cb.addComplete(BKException.Code.LedgerFencedException,
MockLedgerHandle.this,
+ LedgerHandle.INVALID_ENTRY_ID, ctx);
+ } else {
+ lastEntry = entries.size();
+ byte[] storedData = new byte[data.readableBytes()];
+ data.readBytes(storedData);
+ LedgerEntry entry = new MockLedgerEntry(ledgerId,
lastEntry, storedData);
+ entries.add(entry);
+ data.release();
+ cb.addComplete(0, MockLedgerHandle.this, lastEntry, ctx);
+ }
+ }
+ });
+ }
+
+ @Override
+ public long getId() {
+ return ledgerId;
+ }
+
+ @Override
+ public long getLastAddConfirmed() {
+ return lastEntry;
+ }
+
+ @Override
+ public long getLength() {
+ long length = 0;
+ for (LedgerEntry entry : entries) {
+ length += entry.getLength();
+ }
+
+ return length;
+ }
+
+ private static final Logger log =
LoggerFactory.getLogger(MockLedgerHandle.class);
+
+}
--
To stop receiving notification emails like this one, please contact
[email protected].