This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new b59d63e Add missed tests for SortedLedgerStorage
b59d63e is described below
commit b59d63e8be237e80872f73d92a31b4213d9a649b
Author: Sijie Guo <[email protected]>
AuthorDate: Tue Jul 4 08:24:28 2017 +0200
Add missed tests for SortedLedgerStorage
Added two missed tests for SortedLedgerStorage. It was missed when we
(twitter) merged the SortedLedgerStorage back.
Author: Sijie Guo <[email protected]>
Reviewers: Enrico Olivelli, Jia Zhai
This closes #227 from sijie/more_memory_table_related_tests
---
.../java/org/apache/bookkeeper/bookie/LogMark.java | 2 +
.../bookkeeper/bookie/TestEntryMemTable.java | 263 +++++++++++++++++++++
.../bookkeeper/bookie/TestSkipListArena.java | 202 ++++++++++++++++
3 files changed, 467 insertions(+)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java
index 4bf1e05..7aa2850 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java
@@ -28,6 +28,8 @@ class LogMark {
long logFileId;
long logFileOffset;
+ public static final LogMark MAX_VALUE = new LogMark(Long.MAX_VALUE,
Long.MAX_VALUE);
+
public LogMark() {
setLogMark(0, 0);
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
new file mode 100644
index 0000000..0ca108e
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
@@ -0,0 +1,263 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.HashSet;
+
+import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Test;
+import org.junit.Before;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+public class TestEntryMemTable implements CacheCallback, SkipListFlusher,
CheckpointSource {
+
+ private EntryMemTable memTable;
+ private final Random random = new Random();
+ private TestCheckPoint curCheckpoint = new TestCheckPoint(0, 0);
+
+ @Override
+ public Checkpoint newCheckpoint() {
+ return curCheckpoint;
+ }
+
+ @Override
+ public void checkpointComplete(Checkpoint checkpoint, boolean compact)
+ throws IOException {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ this.memTable = new
EntryMemTable(TestBKConfiguration.newServerConfiguration(),
+ this, NullStatsLogger.INSTANCE);
+ }
+
+ @Test
+ public void testLogMark() throws IOException {
+ LogMark mark = new LogMark();
+ assertTrue(mark.compare(new LogMark()) == 0);
+ assertTrue(mark.compare(LogMark.MAX_VALUE) < 0);
+ mark.setLogMark(3, 11);
+ byte[] data = new byte[16];
+ ByteBuffer buf = ByteBuffer.wrap(data);
+ mark.writeLogMark(buf);
+ buf.flip();
+ LogMark mark1 = new LogMark(9, 13);
+ assertTrue(mark1.compare(mark) > 0);
+ mark1.readLogMark(buf);
+ assertTrue(mark1.compare(mark) == 0);
+ }
+
+ /**
+ * Basic put/get
+ * @throws IOException
+ * */
+ @Test
+ public void testBasicOps() throws IOException {
+ long ledgerId = 1;
+ long entryId = 1;
+ byte[] data = new byte[10];
+ random.nextBytes(data);
+ ByteBuffer buf = ByteBuffer.wrap(data);
+ memTable.addEntry(ledgerId, entryId, buf, this);
+ buf.rewind();
+ EntryKeyValue kv = memTable.getEntry(ledgerId, entryId);
+ assertTrue(kv.getLedgerId() == ledgerId);
+ assertTrue(kv.getEntryId() == entryId);
+ assertTrue(kv.getValueAsByteBuffer().nioBuffer().equals(buf));
+ memTable.flush(this);
+ }
+
+ @Override
+ public void onSizeLimitReached() throws IOException {
+ // No-op
+ }
+
+ public void process(long ledgerId, long entryId, ByteBuffer entry)
+ throws IOException {
+ // No-op
+ }
+
+ /**
+ * Test read/write across snapshot
+ * @throws IOException
+ */
+ @Test
+ public void testScanAcrossSnapshot() throws IOException {
+ byte[] data = new byte[10];
+ List<EntryKeyValue> keyValues = new ArrayList<EntryKeyValue>();
+ for (long entryId = 1; entryId < 100; entryId++) {
+ for (long ledgerId = 1; ledgerId < 3; ledgerId++) {
+ random.nextBytes(data);
+ memTable.addEntry(ledgerId, entryId, ByteBuffer.wrap(data),
this);
+ keyValues.add(memTable.getEntry(ledgerId, entryId));
+ if (random.nextInt(16) == 0) {
+ memTable.snapshot();
+ }
+ }
+ }
+
+ for (EntryKeyValue kv : keyValues) {
+ assertTrue(memTable.getEntry(kv.getLedgerId(),
kv.getEntryId()).equals(kv));
+ }
+ memTable.flush(this, Checkpoint.MAX);
+ }
+
+ private class KVFLusher implements SkipListFlusher {
+ final HashSet<EntryKeyValue> keyValues;
+
+ KVFLusher(final HashSet<EntryKeyValue> keyValues) {
+ this.keyValues = keyValues;
+ }
+
+ @Override
+ public void process(long ledgerId, long entryId, ByteBuffer entry)
throws IOException {
+ assertTrue(ledgerId + ":" + entryId + " is duplicate in store!",
+ keyValues.add(new EntryKeyValue(ledgerId, entryId,
entry.array())));
+ }
+ }
+
+ private class NoLedgerFLusher implements SkipListFlusher {
+ @Override
+ public void process(long ledgerId, long entryId, ByteBuffer entry)
throws IOException {
+ throw new NoLedgerException(ledgerId);
+ }
+ }
+
+ /**
+ * Test flush w/ logMark parameter
+ * @throws IOException
+ */
+ @Test
+ public void testFlushLogMark() throws IOException {
+ HashSet<EntryKeyValue> flushedKVs = new HashSet<EntryKeyValue>();
+ KVFLusher flusher = new KVFLusher(flushedKVs);
+
+ curCheckpoint.setCheckPoint(2, 2);
+
+ byte[] data = new byte[10];
+ long ledgerId = 100;
+ for (long entryId = 1; entryId < 100; entryId++) {
+ random.nextBytes(data);
+ memTable.addEntry(ledgerId, entryId, ByteBuffer.wrap(data), this);
+ }
+
+ assertNull(memTable.snapshot(new TestCheckPoint(1, 1)));
+ assertNotNull(memTable.snapshot(new TestCheckPoint(3, 3)));
+
+ assertTrue(0 < memTable.flush(flusher));
+ assertTrue(0 == memTable.flush(flusher));
+
+ curCheckpoint.setCheckPoint(4, 4);
+
+ random.nextBytes(data);
+ memTable.addEntry(ledgerId, 101, ByteBuffer.wrap(data), this);
+ assertTrue(0 == memTable.flush(flusher));
+
+ assertTrue(0 == memTable.flush(flusher, new TestCheckPoint(3, 3)));
+ assertTrue(0 < memTable.flush(flusher, new TestCheckPoint(4, 5)));
+ }
+
+ /**
+ * Test snapshot/flush interaction
+ * @throws IOException
+ */
+ @Test
+ public void testFlushSnapshot() throws IOException {
+ HashSet<EntryKeyValue> keyValues = new HashSet<EntryKeyValue>();
+ HashSet<EntryKeyValue> flushedKVs = new HashSet<EntryKeyValue>();
+ KVFLusher flusher = new KVFLusher(flushedKVs);
+
+ byte[] data = new byte[10];
+ for (long entryId = 1; entryId < 100; entryId++) {
+ for (long ledgerId = 1; ledgerId < 100; ledgerId++) {
+ random.nextBytes(data);
+ assertTrue(ledgerId + ":" + entryId + " is duplicate in
mem-table!",
+ memTable.addEntry(ledgerId, entryId,
ByteBuffer.wrap(data), this) != 0);
+ assertTrue(ledgerId + ":" + entryId + " is duplicate in
hash-set!",
+ keyValues.add(memTable.getEntry(ledgerId, entryId)));
+ if (random.nextInt(16) == 0) {
+ if (null != memTable.snapshot()) {
+ if (random.nextInt(2) == 0) {
+ memTable.flush(flusher);
+ }
+ }
+ }
+ }
+ }
+
+ memTable.flush(flusher, Checkpoint.MAX);
+ for (EntryKeyValue kv : keyValues) {
+ assertTrue("kv " + kv.toString() + " was not flushed!",
flushedKVs.contains(kv));
+ }
+ }
+
+ /**
+ * Test NoLedger exception/flush interaction
+ * @throws IOException
+ */
+ @Test
+ public void testNoLedgerException() throws IOException {
+ NoLedgerFLusher flusher = new NoLedgerFLusher();
+
+ byte[] data = new byte[10];
+ for (long entryId = 1; entryId < 100; entryId++) {
+ for (long ledgerId = 1; ledgerId < 100; ledgerId++) {
+ random.nextBytes(data);
+ if (random.nextInt(16) == 0) {
+ if (null != memTable.snapshot()) {
+ memTable.flush(flusher);
+ }
+ }
+ }
+ }
+
+ memTable.flush(flusher, Checkpoint.MAX);
+ }
+
+ private static class TestCheckPoint implements Checkpoint {
+
+ LogMark mark;
+
+ public TestCheckPoint(long fid, long fpos) {
+ mark = new LogMark(fid, fpos);
+ }
+
+ private void setCheckPoint(long fid, long fpos) {
+ mark.setLogMark(fid, fpos);
+ }
+
+ @Override
+ public int compareTo(Checkpoint o) {
+ if (Checkpoint.MAX == o) {
+ return -1;
+ }
+ return mark.compare(((TestCheckPoint)o).mark);
+ }
+
+ }
+}
+
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSkipListArena.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSkipListArena.java
new file mode 100644
index 0000000..e42eacc
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSkipListArena.java
@@ -0,0 +1,202 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.bookie.SkipListArena.MemorySlice;
+import org.junit.Test;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.TreeMap;
+import com.google.common.primitives.Ints;
+
+public class TestSkipListArena {
+
+ class CustomConfiguration extends ServerConfiguration {
+ @Override
+ public int getSkipListArenaChunkSize() {
+ return 4096;
+ }
+ @Override
+ public int getSkipListArenaMaxAllocSize() {
+ return 1024;
+ }
+ @Override
+ public boolean getJournalFlushWhenQueueEmpty() {
+ return true;
+ }
+
+ }
+
+ final CustomConfiguration cfg = new CustomConfiguration();
+
+ /**
+ * Test random allocations
+ */
+ @Test
+ public void testRandomAllocation() {
+ Random rand = new Random();
+ SkipListArena arena = new SkipListArena(cfg);
+ int expectedOff = 0;
+ byte[] lastBuffer = null;
+
+ // 10K iterations by 0-512 alloc -> 2560kB expected
+ // should be reasonable for unit test and also cover wraparound
+ // behavior
+ for (int i = 0; i < 10000; i++) {
+ int size = rand.nextInt(512);
+ MemorySlice alloc = arena.allocateBytes(size);
+
+ if (alloc.getData() != lastBuffer) {
+ expectedOff = 0;
+ lastBuffer = alloc.getData();
+ }
+ assertTrue(expectedOff == alloc.getOffset());
+ assertTrue("Allocation " + alloc + " overruns buffer",
+ alloc.getOffset() + size <= alloc.getData().length);
+ expectedOff += size;
+ }
+ }
+
+ @Test
+ public void testLargeAllocation() {
+ SkipListArena arena = new SkipListArena(cfg);
+ MemorySlice alloc = arena.allocateBytes(1024 + 1024);
+ assertNull("2KB allocation shouldn't be satisfied by LAB.", alloc);
+ }
+
+ private class ByteArray {
+ final byte[] bytes;
+ ByteArray(final byte[] bytes) {
+ this.bytes = bytes;
+ }
+
+ @Override
+ public int hashCode() {
+ return bytes.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (object instanceof ByteArray) {
+ ByteArray other = (ByteArray)object;
+ return this.bytes.equals(other.bytes);
+ }
+ return false;
+ }
+ }
+
+ private static class AllocBuffer implements Comparable<AllocBuffer>{
+ private final MemorySlice alloc;
+ private final int size;
+ public AllocBuffer(MemorySlice alloc, int size) {
+ super();
+ this.alloc = alloc;
+ this.size = size;
+ }
+
+ @Override
+ public int compareTo(AllocBuffer e) {
+ assertTrue(alloc.getData() == e.alloc.getData());
+ return Ints.compare(alloc.getOffset(), e.alloc.getOffset());
+ }
+
+ @Override
+ public String toString() {
+ return alloc + ":" + size;
+ }
+ }
+
+ private Thread getAllocThread(final ConcurrentLinkedQueue<AllocBuffer>
queue,
+ final CountDownLatch latch,
+ final SkipListArena arena) {
+ return new Thread(new Runnable() {
+ @Override
+ public void run() {
+ Random rand = new Random();
+ for (int j = 0; j < 1000; j++) {
+ int size = rand.nextInt(512);
+ MemorySlice alloc = arena.allocateBytes(size);
+ queue.add(new AllocBuffer(alloc, size));
+ }
+ latch.countDown();
+ }
+ });
+ }
+
+ /**
+ * Test concurrent allocation, check the results don't overlap
+ */
+ @Test
+ public void testConcurrency() throws Exception {
+ final SkipListArena arena = new SkipListArena(cfg);
+ final CountDownLatch latch = new CountDownLatch(10);
+ final ConcurrentLinkedQueue<AllocBuffer> queue = new
ConcurrentLinkedQueue<AllocBuffer>();
+
+ Set<Thread> testThreads = new HashSet<Thread>();
+ for (int i = 0; i < 10; i++) {
+ testThreads.add(getAllocThread(queue, latch, arena));
+ }
+
+ for (Thread thread : testThreads) {
+ thread.start();
+ }
+ latch.await();
+
+ // Partition the allocations by the actual byte[] they share,
+ // make sure offsets are unique and non-overlap for each buffer.
+ Map<ByteArray, Map<Integer, AllocBuffer>> mapsByArray = new
HashMap<ByteArray, Map<Integer, AllocBuffer>>();
+ boolean overlapped = false;
+
+ final AllocBuffer[] buffers = queue.toArray(new AllocBuffer[0]);
+ for (AllocBuffer buf : buffers) {
+ if (buf.size != 0) {
+ ByteArray ptr = new ByteArray(buf.alloc.getData());
+ Map<Integer, AllocBuffer> tree_map = mapsByArray.get(ptr);
+ if (tree_map == null) {
+ tree_map = new TreeMap<Integer, AllocBuffer>();
+ mapsByArray.put(ptr, tree_map);
+ }
+ AllocBuffer other = tree_map.put(new
Integer(buf.alloc.getOffset()), buf);
+ if (other != null) {
+ fail("Buffer " + other.toString() + " overlapped with " +
buf.toString());
+ }
+ }
+ }
+
+ // Now check each byte array to make sure allocations don't overlap
+ for (Map<Integer, AllocBuffer> tree_map : mapsByArray.values()) {
+ int expectedOff = 0;
+ for (AllocBuffer buf : tree_map.values()) {
+ assertEquals(expectedOff, buf.alloc.getOffset());
+ expectedOff += buf.size;
+ }
+ }
+ }
+}
+
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].