This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new e624156  Issue #570: getting rid of unnecessary synchronization in 
InterleavedLedgerStorage
e624156 is described below

commit e62415676f787c8db243a7bca7feec2dc7de565b
Author: cguttapalem <cguttapa...@salesforce.com>
AuthorDate: Thu Mar 8 22:16:46 2018 -0800

    Issue #570: getting rid of unnecessary synchronization in 
InterleavedLedgerStorage
    
    Descriptions of the changes in this PR:
    
    This is < sub-task1 > of Issue #570.
    
    In InterleavedLedgerStorage, since the initial version of it, addEntry and 
processEntry methods are synchronized, but they are not required to be 
synchronized.
    
    Removal of unnecessary synchronization in InterleavedLedgerStorage methods, 
as described in 
http://mail-archives.apache.org/mod_mbox/bookkeeper-dev/201707.mbox/%3CCAO2yDyZ946fp2S_qR2iL178hPiXgrnFGb%3DpvkyK4ReSYAtNLBw%40mail.gmail.com%3E
    
    Master Issue: #570
    
    Author: cguttapalem <cguttapa...@salesforce.com>
    
    Reviewers: Ivan Kelly <iv...@apache.org>, Enrico Olivelli 
<eolive...@gmail.com>, Jia Zhai <None>, Sijie Guo <si...@apache.org>
    
    This closes #1225 from reddycharan/nonsynchaddinterleavedstorage, closes 
#570
---
 .../bookie/InterleavedLedgerStorage.java           |  13 +-
 .../bookkeeper/meta/LedgerManagerFactory.java      |   2 +-
 .../org/apache/bookkeeper/bookie/EntryLogTest.java | 169 ++++++++++++++++++++-
 3 files changed, 175 insertions(+), 9 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index fdfef0a..81de730 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -34,6 +34,8 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
@@ -74,7 +76,7 @@ public class InterleavedLedgerStorage implements 
CompactableLedgerStorage, Entry
     GarbageCollectorThread gcThread;
 
     // this indicates that a write has happened since the last flush
-    private volatile boolean somethingWritten = false;
+    private final AtomicBoolean somethingWritten = new AtomicBoolean(false);
 
     // Expose Stats
     private OpStatsLogger getOffsetStats;
@@ -264,7 +266,7 @@ public class InterleavedLedgerStorage implements 
CompactableLedgerStorage, Entry
 
 
     @Override
-    public synchronized long addEntry(ByteBuf entry) throws IOException {
+    public long addEntry(ByteBuf entry) throws IOException {
         long ledgerId = entry.getLong(entry.readerIndex() + 0);
         long entryId = entry.getLong(entry.readerIndex() + 8);
         long lac = entry.getLong(entry.readerIndex() + 16);
@@ -360,10 +362,9 @@ public class InterleavedLedgerStorage implements 
CompactableLedgerStorage, Entry
 
     @Override
     public synchronized void flush() throws IOException {
-        if (!somethingWritten) {
+        if (!somethingWritten.compareAndSet(true, false)) {
             return;
         }
-        somethingWritten = false;
         flushOrCheckpoint(false);
     }
 
@@ -419,12 +420,12 @@ public class InterleavedLedgerStorage implements 
CompactableLedgerStorage, Entry
         processEntry(ledgerId, entryId, entry, true);
     }
 
-    protected synchronized void processEntry(long ledgerId, long entryId, 
ByteBuf entry, boolean rollLog)
+    protected void processEntry(long ledgerId, long entryId, ByteBuf entry, 
boolean rollLog)
             throws IOException {
         /*
          * Touch dirty flag
          */
-        somethingWritten = true;
+        somethingWritten.set(true);
 
         /*
          * Log the entry
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
index 8568617..80d3a65 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
@@ -94,7 +94,7 @@ public interface LedgerManagerFactory extends AutoCloseable {
      * @param lm
      *            Layout manager
      */
-    void format(final AbstractConfiguration<?> conf, final LayoutManager lm)
+    void format(AbstractConfiguration<?> conf, LayoutManager lm)
             throws InterruptedException, KeeperException, IOException;
 
     /**
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index c60a997..d4a98ce 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -36,8 +36,13 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -106,8 +111,8 @@ public class EntryLogTest {
         assertTrue(meta.getLedgersMap().containsKey(3L));
     }
 
-    private ByteBuf generateEntry(long ledger, long entry) {
-        byte[] data = ("ledger-" + ledger + "-" + entry).getBytes();
+    private static ByteBuf generateEntry(long ledger, long entry) {
+        byte[] data = generateDataString(ledger, entry).getBytes();
         ByteBuf bb = Unpooled.buffer(8 + 8 + data.length);
         bb.writeLong(ledger);
         bb.writeLong(entry);
@@ -115,6 +120,10 @@ public class EntryLogTest {
         return bb;
     }
 
+    private static String generateDataString(long ledger, long entry) {
+        return ("ledger-" + ledger + "-" + entry);
+    }
+
     @Test
     public void testMissingLogId() throws Exception {
         File tmpDir = createTempDir("entryLogTest", ".dir");
@@ -391,4 +400,160 @@ public class EntryLogTest {
 
         assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), 
logger.getEntryLogsSet());
     }
+
+    static class LedgerStorageWriteTask implements Callable<Boolean> {
+        long ledgerId;
+        int entryId;
+        LedgerStorage ledgerStorage;
+
+        LedgerStorageWriteTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+            this.ledgerStorage = ledgerStorage;
+        }
+
+        @Override
+        public Boolean call() throws IOException {
+            try {
+                ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
+            } catch (IOException e) {
+                LOG.error("Got Exception for AddEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+                throw new IOException("Got Exception for AddEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+                        e);
+            }
+            return true;
+        }
+    }
+
+    static class LedgerStorageFlushTask implements Callable<Boolean> {
+        LedgerStorage ledgerStorage;
+
+        LedgerStorageFlushTask(LedgerStorage ledgerStorage) {
+            this.ledgerStorage = ledgerStorage;
+        }
+
+        @Override
+        public Boolean call() throws IOException {
+            try {
+                ledgerStorage.flush();
+            } catch (IOException e) {
+                LOG.error("Got Exception for flush call", e);
+                throw new IOException("Got Exception for Flush call", e);
+            }
+            return true;
+        }
+    }
+
+    static class LedgerStorageReadTask implements Callable<Boolean> {
+        long ledgerId;
+        int entryId;
+        LedgerStorage ledgerStorage;
+
+        LedgerStorageReadTask(long ledgerId, int entryId, LedgerStorage 
ledgerStorage) {
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+            this.ledgerStorage = ledgerStorage;
+        }
+
+        @Override
+        public Boolean call() throws IOException {
+            try {
+                ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId);
+                ByteBuf actualByteBuf = ledgerStorage.getEntry(ledgerId, 
entryId);
+                if (!expectedByteBuf.equals(actualByteBuf)) {
+                    LOG.error("Expected Entry: {} Actual Entry: {}", 
expectedByteBuf.toString(Charset.defaultCharset()),
+                            actualByteBuf.toString(Charset.defaultCharset()));
+                    throw new IOException("Expected Entry: " + 
expectedByteBuf.toString(Charset.defaultCharset())
+                            + " Actual Entry: " + 
actualByteBuf.toString(Charset.defaultCharset()));
+                }
+            } catch (IOException e) {
+                LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+                throw new IOException("Got Exception for GetEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+                        e);
+            }
+            return true;
+        }
+    }
+
+    /**
+     * test concurrent write operations and then concurrent read
+     * operations using InterleavedLedgerStorage.
+     */
+    @Test
+    public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() 
throws Exception {
+        File ledgerDir = createTempDir("bkTest", ".dir");
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(ledgerDir.toString());
+        conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
+        conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+        Bookie bookie = new Bookie(conf);
+        InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) 
bookie.ledgerStorage);
+        Random rand = new Random(0);
+
+        int numOfLedgers = 70;
+        int numEntries = 1500;
+        // Create ledgers
+        for (int i = 0; i < numOfLedgers; i++) {
+            ledgerStorage.setMasterKey(i, "key".getBytes());
+        }
+
+        ExecutorService executor = Executors.newFixedThreadPool(10);
+        List<Callable<Boolean>> writeAndFlushTasks = new 
ArrayList<Callable<Boolean>>();
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfLedgers; i++) {
+                writeAndFlushTasks.add(new LedgerStorageWriteTask(i, j, 
ledgerStorage));
+            }
+        }
+
+        /*
+         * add some flush tasks to the list of writetasks list.
+         */
+        for (int i = 0; i < (numOfLedgers * numEntries) / 500; i++) {
+            writeAndFlushTasks.add(rand.nextInt(writeAndFlushTasks.size()), 
new LedgerStorageFlushTask(ledgerStorage));
+        }
+
+        // invoke all those write/flush tasks all at once concurrently
+        executor.invokeAll(writeAndFlushTasks).forEach((future) -> {
+            try {
+                future.get();
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                LOG.error("Write/Flush task failed because of 
InterruptedException", ie);
+                Assert.fail("Write/Flush task interrupted");
+            } catch (Exception ex) {
+                LOG.error("Write/Flush task failed because of  exception", ex);
+                Assert.fail("Write/Flush task failed " + ex.getMessage());
+            }
+        });
+
+        List<Callable<Boolean>> readAndFlushTasks = new 
ArrayList<Callable<Boolean>>();
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfLedgers; i++) {
+                readAndFlushTasks.add(new LedgerStorageReadTask(i, j, 
ledgerStorage));
+            }
+        }
+
+        /*
+         * add some flush tasks to the list of readtasks list.
+         */
+        for (int i = 0; i < (numOfLedgers * numEntries) / 500; i++) {
+            readAndFlushTasks.add(rand.nextInt(readAndFlushTasks.size()), new 
LedgerStorageFlushTask(ledgerStorage));
+        }
+
+        // invoke all those read/flush tasks all at once concurrently
+        executor.invokeAll(readAndFlushTasks).forEach((future) -> {
+            try {
+                future.get();
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                LOG.error("Read/Flush task failed because of 
InterruptedException", ie);
+                Assert.fail("Read/Flush task interrupted");
+            } catch (Exception ex) {
+                LOG.error("Read/Flush task failed because of  exception", ex);
+                Assert.fail("Read/Flush task failed " + ex.getMessage());
+            }
+        });
+
+        executor.shutdownNow();
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to