This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new a6f4b79 ISSUE #693: add interface and implementation of LedgerEntries
a6f4b79 is described below
commit a6f4b792270194474649eda9261635c24c218b0a
Author: Jia Zhai <[email protected]>
AuthorDate: Thu Nov 23 19:56:21 2017 +0800
ISSUE #693: add interface and implementation of LedgerEntries
In the new API proposed in #506 , we return Iterable<LedgerEntry> for read
entries. It is a bit problematic when returning multiple iterators and have no
ability to release the entry buffers hold by ledger entries.
In this change, the implementation of retainIterator() increased the
reference of the buffers when creating an iterator, it will be easy for
byteBuf's management; it also make LedgerEntries a recycle object, and it is
returned to the pool when #close() is called. on #close(), it also releases all
the references to release resources.
Descriptions of the changes in this PR:
1. add interface LedgerEntries;
2. add implemtation of LedgerEntries;
3. fix related build and test errors.
Author: Jia Zhai <[email protected]>
Reviewers: ivankelly, sijie, eolivelli
This closes #727 from zhaijack/issue-693, closes #693
---
.../org/apache/bookkeeper/client/LedgerHandle.java | 15 +--
.../apache/bookkeeper/client/PendingReadOp.java | 9 +-
.../bookkeeper/client/api/LedgerEntries.java | 46 ++++++++
.../apache/bookkeeper/client/api/ReadHandle.java | 4 +-
.../bookkeeper/client/impl/LedgerEntriesImpl.java | 104 +++++++++++++++++
.../apache/bookkeeper/client/TestParallelRead.java | 12 +-
.../bookkeeper/client/api/BookKeeperApiTest.java | 10 +-
.../client/impl/LedgerEntriesImplTest.java | 124 +++++++++++++++++++++
8 files changed, 298 insertions(+), 26 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 49962a9..51efefd 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -63,6 +63,7 @@ import
org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadCallback;
import
org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadLastConfirmedCallback;
import org.apache.bookkeeper.client.api.BKException.Code;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
@@ -634,7 +635,7 @@ public class LedgerHandle implements WriteHandle {
* id of last entry of sequence
*/
@Override
- public
CompletableFuture<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>>
read(long firstEntry, long lastEntry) {
+ public CompletableFuture<LedgerEntries> read(long firstEntry, long
lastEntry) {
// Little sanity check
if (firstEntry < 0 || firstEntry > lastEntry) {
LOG.error("IncorrectParameterException on ledgerId:{}
firstEntry:{} lastEntry:{}",
@@ -675,7 +676,7 @@ public class LedgerHandle implements WriteHandle {
* @see #readUnconfirmedEntries(long, long)
*/
@Override
- public
CompletableFuture<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>>
readUnconfirmed(long firstEntry, long lastEntry) {
+ public CompletableFuture<LedgerEntries> readUnconfirmed(long firstEntry,
long lastEntry) {
// Little sanity check
if (firstEntry < 0 || firstEntry > lastEntry) {
LOG.error("IncorrectParameterException on ledgerId:{}
firstEntry:{} lastEntry:{}",
@@ -689,14 +690,14 @@ public class LedgerHandle implements WriteHandle {
void asyncReadEntriesInternal(long firstEntry, long lastEntry,
ReadCallback cb, Object ctx) {
if(!bk.isClosed()) {
readEntriesInternalAsync(firstEntry, lastEntry)
- .whenCompleteAsync(new
FutureEventListener<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>>() {
+ .whenCompleteAsync(new FutureEventListener<LedgerEntries>() {
@Override
- public void
onSuccess(Iterable<org.apache.bookkeeper.client.api.LedgerEntry> iterable) {
+ public void onSuccess(LedgerEntries entries) {
cb.readComplete(
Code.OK,
LedgerHandle.this,
IteratorUtils.asEnumeration(
- Iterators.transform(iterable.iterator(), le ->
{
+ Iterators.transform(entries.iterator(), le -> {
LedgerEntry entry = new
LedgerEntry((LedgerEntryImpl) le);
le.close();
return entry;
@@ -719,8 +720,8 @@ public class LedgerHandle implements WriteHandle {
}
}
- CompletableFuture<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>>
readEntriesInternalAsync(long firstEntry,
-
long lastEntry) {
+ CompletableFuture<LedgerEntries> readEntriesInternalAsync(long firstEntry,
+ long lastEntry) {
PendingReadOp op = new PendingReadOp(this, bk.getScheduler(),
firstEntry, lastEntry);
if(!bk.isClosed()) {
bk.getMainWorkerPool().submitOrdered(ledgerId, op);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index e31c5b7..11b8038 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -35,7 +35,8 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
-import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.*;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -59,7 +60,7 @@ class PendingReadOp implements ReadEntryCallback,
SafeRunnable {
private final ScheduledExecutorService scheduler;
private ScheduledFuture<?> speculativeTask = null;
protected final List<LedgerEntryRequest> seq;
- private final CompletableFuture<Iterable<LedgerEntry>> future;
+ private final CompletableFuture<LedgerEntries> future;
Set<BookieSocketAddress> heardFromHosts;
BitSet heardFromHostsBitSet;
LedgerHandle lh;
@@ -454,7 +455,7 @@ class PendingReadOp implements ReadEntryCallback,
SafeRunnable {
readOpLogger = lh.bk.getReadOpLogger();
}
- CompletableFuture<Iterable<LedgerEntry>> future() {
+ CompletableFuture<LedgerEntries> future() {
return future;
}
@@ -600,7 +601,7 @@ class PendingReadOp implements ReadEntryCallback,
SafeRunnable {
future.completeExceptionally(BKException.create(code));
} else {
readOpLogger.registerSuccessfulEvent(latencyNanos,
TimeUnit.NANOSECONDS);
- future.complete(Lists.transform(seq, input -> input.entryImpl));
+ future.complete(LedgerEntriesImpl.create(Lists.transform(seq,
input -> input.entryImpl)));
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerEntries.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerEntries.java
new file mode 100644
index 0000000..5bcb48b
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerEntries.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client.api;
+
+import java.util.Iterator;
+
+/**
+ * Interface to wrap the entries.
+ *
+ * @since 4.6
+ */
+public interface LedgerEntries extends AutoCloseable {
+
+ /**
+ * Gets a specific LedgerEntry by entryId.
+ *
+ * @param entryId the LedgerEntry id
+ * @return the LedgerEntry, null if no LedgerEntry with such entryId.
+ */
+ LedgerEntry getEntry(long entryId);
+
+ /**
+ * Calling this method does not modify the reference count of the ByteBuf
in the returned LedgerEntry objects.
+ * The caller who calls {@link #iterator()} should make sure that they do
not call ByteBuf.release() on the
+ * LedgerEntry objects to avoid a double free.
+ * All reference counts will be decremented when the containing
LedgerEntries object is closed.
+ *
+ * @return the iterator of type LedgerEntry.
+ */
+ Iterator<LedgerEntry> iterator();
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
index cea0f8e..4227e82 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java
@@ -42,7 +42,7 @@ public interface ReadHandle extends Handle {
* id of last entry of sequence, inclusive
* @return an handle to the result of the operation
*/
- CompletableFuture<Iterable<LedgerEntry>> read(long firstEntry, long
lastEntry);
+ CompletableFuture<LedgerEntries> read(long firstEntry, long lastEntry);
/**
* Read a sequence of entries asynchronously, allowing to read after the
LastAddConfirmed range.
@@ -67,7 +67,7 @@ public interface ReadHandle extends Handle {
* @see #read(long, long)
* @see #readLastAddConfirmed()
*/
- CompletableFuture<Iterable<LedgerEntry>> readUnconfirmed(long firstEntry,
long lastEntry);
+ CompletableFuture<LedgerEntries> readUnconfirmed(long firstEntry, long
lastEntry);
/**
* Obtains asynchronously the last confirmed write from a quorum of
bookies. This
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LedgerEntriesImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LedgerEntriesImpl.java
new file mode 100644
index 0000000..e866da4
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LedgerEntriesImpl.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.impl;
+
+import io.netty.util.Recycler;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
+/**
+ * Ledger entries implementation. It is a simple wrap of a list of ledger
entries.
+ */
+public class LedgerEntriesImpl implements LedgerEntries {
+ private List<LedgerEntry> entries;
+ private final Recycler.Handle<LedgerEntriesImpl> recyclerHandle;
+
+ private LedgerEntriesImpl(Recycler.Handle<LedgerEntriesImpl>
recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private static final Recycler<LedgerEntriesImpl> RECYCLER = new
Recycler<LedgerEntriesImpl>() {
+ @Override
+ protected LedgerEntriesImpl
newObject(Recycler.Handle<LedgerEntriesImpl> handle) {
+ return new LedgerEntriesImpl(handle);
+ }
+ };
+
+ private void recycle() {
+ releaseByteBuf();
+ recyclerHandle.recycle(this);
+ }
+
+ private void releaseByteBuf() {
+ if (entries != null) {
+ entries.forEach(LedgerEntry::close);
+ entries.clear();
+ entries = null;
+ }
+ }
+
+ /**
+ * Create ledger entries.
+ *
+ * @param entries the entries with ordering
+ * @return the LedgerEntriesImpl
+ */
+ public static LedgerEntriesImpl create(List<LedgerEntry> entries) {
+ checkArgument(!entries.isEmpty(), "entries for create should not be
empty.");
+ LedgerEntriesImpl ledgerEntries = RECYCLER.get();
+ ledgerEntries.entries = entries;
+ return ledgerEntries;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public LedgerEntry getEntry(long entryId) {
+ checkNotNull(entries, "entries has been recycled");
+ long firstId = entries.get(0).getEntryId();
+ long lastId = entries.get(entries.size() - 1).getEntryId();
+ if (entryId < firstId || entryId > lastId) {
+ throw new IndexOutOfBoundsException("required index: " + entryId +
+ " is out of bounds: [ " + firstId + ", " + lastId + " ].");
+ }
+ return entries.get((int) (entryId - firstId));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Iterator<LedgerEntry> iterator() {
+ checkNotNull(entries, "entries has been recycled");
+ return entries.iterator();
+ }
+
+ @Override
+ public void close(){
+ recycle();
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
index 0f3365e..ff3a776 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
@@ -78,9 +78,7 @@ public class TestParallelRead extends
BookKeeperClusterTestCase {
PendingReadOp readOp =
new PendingReadOp(lh, lh.bk.scheduler, i, i);
readOp.parallelRead(true).submit();
- Iterable<LedgerEntry> iterable = readOp.future().get();
- assertNotNull(iterable);
- Iterator<LedgerEntry> entries = iterable.iterator();
+ Iterator<LedgerEntry> entries = readOp.future().get().iterator();
assertTrue(entries.hasNext());
LedgerEntry entry = entries.next();
assertNotNull(entry);
@@ -93,9 +91,7 @@ public class TestParallelRead extends
BookKeeperClusterTestCase {
PendingReadOp readOp =
new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1);
readOp.parallelRead(true).submit();
- Iterable<LedgerEntry> iterable = readOp.future().get();
- assertNotNull(iterable);
- Iterator<LedgerEntry> iterator = iterable.iterator();
+ Iterator<LedgerEntry> iterator = readOp.future().get().iterator();
int numReads = 0;
while (iterator.hasNext()) {
@@ -198,9 +194,7 @@ public class TestParallelRead extends
BookKeeperClusterTestCase {
PendingReadOp readOp =
new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1);
readOp.parallelRead(true).submit();
- Iterable<LedgerEntry> iterable = readOp.future().get();
- assertNotNull(iterable);
- Iterator<LedgerEntry> entries = iterable.iterator();
+ Iterator<LedgerEntry> entries = readOp.future().get().iterator();
int numReads = 0;
while (entries.hasNext()) {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
index 0350720..bdb26a4 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertEquals;
import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;
+import java.util.Iterator;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
import org.apache.bookkeeper.client.BKException.BKDuplicateEntryIdException;
@@ -275,11 +276,12 @@ public class BookKeeperApiTest extends
MockBookKeeperTestCase {
result(newDeleteLedgerOp().withLedgerId(lId).execute());
}
- private static void checkEntries(Iterable<LedgerEntry> entries, byte[]
data)
+ private static void checkEntries(LedgerEntries entries, byte[] data)
throws InterruptedException, BKException {
- for (LedgerEntry le : entries) {
- assertArrayEquals(data, le.getEntryBytes());
+ Iterator<LedgerEntry> iterator = entries.iterator();
+ while(iterator.hasNext()) {
+ LedgerEntry entry = iterator.next();
+ assertArrayEquals(data, entry.getEntryBytes());
}
}
-
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/impl/LedgerEntriesImplTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/impl/LedgerEntriesImplTest.java
new file mode 100644
index 0000000..2339848
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/impl/LedgerEntriesImplTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.client.impl;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link LedgerEntriesImpl}.
+ */
+public class LedgerEntriesImplTest {
+ private final int entryNumber = 7;
+ private LedgerEntriesImpl ledgerEntriesImpl;
+ private final List<LedgerEntry> entryList = Lists.newArrayList();
+
+ // content for each entry
+ private final long ledgerId = 1234L;
+ private final long entryId = 5678L;
+ private final long length = 9876L;
+ private final byte[] dataBytes = "test-ledger-entry-impl".getBytes(UTF_8);
+ private final ArrayList<ByteBuf> bufs =
Lists.newArrayListWithExpectedSize(entryNumber);
+
+ public LedgerEntriesImplTest () {
+ for(int i = 0; i < entryNumber; i++) {
+ ByteBuf buf = Unpooled.wrappedBuffer(dataBytes);
+ bufs.add(buf);
+
+ entryList.add(LedgerEntryImpl.create(ledgerId + i,
+ entryId + i,
+ length + i,
+ buf));
+ }
+
+ ledgerEntriesImpl = LedgerEntriesImpl.create(entryList);
+ }
+
+ @After
+ public void tearDown() {
+ ledgerEntriesImpl.close();
+
+ // References should be released after close.
+ bufs.forEach(byteBuf -> assertEquals(0, byteBuf.refCnt()));
+
+ try {
+ ledgerEntriesImpl.getEntry(entryId);
+ fail("should fail getEntry after close");
+ } catch (NullPointerException e) {
+ // expected behavior
+ }
+
+ try {
+ ledgerEntriesImpl.iterator();
+ fail("should fail iterator after close");
+ } catch (NullPointerException e) {
+ // expected behavior
+ }
+ }
+
+ @Test
+ public void testGetEntry() {
+ for(int i = 0; i < entryNumber; i ++) {
+ LedgerEntry entry = ledgerEntriesImpl.getEntry(entryId + i);
+ assertEquals(entryList.get(i).getLedgerId(), entry.getLedgerId());
+ assertEquals(entryList.get(i).getEntryId(), entry.getEntryId());
+ assertEquals(entryList.get(i).getLength(), entry.getLength());
+
+ ByteBuf buf = entry.getEntryBuffer();
+ byte[] content = new byte[buf.readableBytes()];
+ buf.readBytes(content);
+ assertArrayEquals(dataBytes, content);
+
+ assertEquals(1, entry.getEntryBuffer().refCnt());
+ }
+
+ try {
+ LedgerEntry entry = ledgerEntriesImpl.getEntry(entryId - 1);
+ fail("Should get IndexOutOfBoundsException");
+ } catch (IndexOutOfBoundsException e) {
+ // expected behavior
+ }
+
+ try {
+ LedgerEntry entry = ledgerEntriesImpl.getEntry(entryId +
entryNumber);
+ fail("Should get IndexOutOfBoundsException");
+ } catch (IndexOutOfBoundsException e) {
+ // expected behavior
+ }
+ }
+
+ @Test
+ public void testIterator() {
+ Iterator<LedgerEntry> entryIterator = ledgerEntriesImpl.iterator();
+ entryIterator.forEachRemaining(ledgerEntry -> assertEquals(1,
ledgerEntry.getEntryBuffer().refCnt()));
+ }
+}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].