DL-101: Improve session expire handling on fetching log segments for readers

This change focuses on improving the session expire handling on fetching log 
segments for readers.

The log segment management in DL is now done by 3 parts.

- a LogSegmentMetadataStore (one per namespace instance): it is used for 
fetching the log segments from log segment metadata store (ZooKeeper). it 
doesn't do any caching.
- a LogSegmentMetadataCache (one per namespace instance): it is a guava cache 
based metadata cache. it maintains a mapping between log segment metadata path 
and the log segment metadata. it manages the cache for the log segments that 
will be accessed in this namespace instance. it doesn't manage the sequence of 
the log segments for streams.
- a PerStreamLogSegmentCache for each BKLogHandler. the log segment cache is 
per stream. it maintains the sequence of the log segments.

BKLogWriteHandler doesn't watch the log segment changes. It fetches minimal 
number of log segments when it is created and fetches the full list of log 
segments for truncations. New log segments will be added to the per stream log 
segment cache with log segment rolling.

BKLogReadHandler watch the log segments changes and only notify when the list 
of log segments is changed. the session handling which is specific to the 
metadata store is hidden to the implementations of LogSegmentMetadataStore.

The change tries to cleanup bunch of unused methods in BKLog{Read,Write}Handler 
too.


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/9ee7d016
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/9ee7d016
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/9ee7d016

Branch: refs/heads/master
Commit: 9ee7d016072f45060d14af3e5e6ddd75a92285f6
Parents: 3ea8853
Author: Sijie Guo <[email protected]>
Authored: Thu Jul 28 21:10:33 2016 -0700
Committer: Sijie Guo <[email protected]>
Committed: Tue Dec 27 16:49:26 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/AsyncNotification.java       |   4 +-
 .../distributedlog/BKAsyncLogReaderDLSN.java    |  29 +-
 .../distributedlog/BKDistributedLogManager.java |  27 +-
 .../BKDistributedLogNamespace.java              |   5 +
 .../twitter/distributedlog/BKLogHandler.java    | 827 +++++--------------
 .../distributedlog/BKLogReadHandler.java        | 253 +++++-
 .../distributedlog/BKLogWriteHandler.java       | 194 +++--
 .../distributedlog/BKSyncLogReaderDLSN.java     |   4 +-
 .../DistributedLogConfiguration.java            |  71 ++
 .../distributedlog/LogSegmentMetadata.java      |  12 +-
 .../callback/LogSegmentListener.java            |   7 +-
 .../callback/LogSegmentNamesListener.java       |  12 +-
 .../function/CloseAsyncCloseableFunction.java   |  51 ++
 .../function/GetVersionedValueFunction.java     |  39 +
 .../impl/ZKLogSegmentMetadataStore.java         | 158 ++--
 .../logsegment/LogSegmentCache.java             | 228 -----
 .../logsegment/LogSegmentMetadataCache.java     |  98 +++
 .../logsegment/LogSegmentMetadataStore.java     |  17 +-
 .../logsegment/PerStreamLogSegmentCache.java    | 243 ++++++
 .../readahead/ReadAheadWorker.java              | 171 ++--
 .../distributedlog/util/FutureUtils.java        |   5 +-
 .../com/twitter/distributedlog/DLMTestUtil.java |  12 +-
 .../NonBlockingReadsTestUtil.java               |   2 +-
 .../distributedlog/TestAsyncReaderWriter.java   |  16 +-
 .../TestBKDistributedLogManager.java            |  17 +-
 .../distributedlog/TestBKLogReadHandler.java    |  89 +-
 .../twitter/distributedlog/TestReadAhead.java   |   8 -
 .../twitter/distributedlog/TestReadUtils.java   |   8 +-
 .../twitter/distributedlog/admin/TestDLCK.java  |   1 +
 .../admin/TestDistributedLogAdmin.java          |  61 +-
 .../impl/TestZKLogSegmentMetadataStore.java     | 118 ++-
 .../logsegment/TestLogSegmentCache.java         | 220 -----
 .../TestPerStreamLogSegmentCache.java           | 186 +++++
 .../tools/TestDistributedLogTool.java           |   8 +-
 .../exceptions/LogSegmentNotFoundException.java |  32 +
 .../src/main/thrift/service.thrift              |   2 +
 .../distributedlog/service/MonitorService.java  |   5 +
 37 files changed, 1762 insertions(+), 1478 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java
index e7cb601..bd71147 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java
@@ -20,8 +20,10 @@ package com.twitter.distributedlog;
 public interface AsyncNotification {
     /**
      * Triggered when the background activity encounters an exception
+     *
+     * @param reason the exception that encountered.
      */
-    void notifyOnError();
+    void notifyOnError(Throwable reason);
 
     /**
      *  Triggered when the background activity completes an operation

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
index b1a9273..90230ae 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
@@ -104,7 +104,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, 
Runnable, AsyncNotificatio
 
     private boolean lockStream = false;
 
-    private boolean disableReadAheadZKNotification = false;
+    private boolean disableReadAheadLogSegmentsNotification = false;
 
     private final boolean returnEndOfStreamRecord;
 
@@ -400,23 +400,17 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, 
Runnable, AsyncNotificatio
                         bkLedgerManager.startReadAhead(
                                 new LedgerReadPosition(getStartDLSN()),
                                 failureInjector);
-                        if (disableReadAheadZKNotification) {
-                            bkLedgerManager.disableReadAheadZKNotification();
+                        if (disableReadAheadLogSegmentsNotification) {
+                            
bkLedgerManager.disableReadAheadLogSegmentsNotification();
                         }
                     } catch (Exception exc) {
-                        setLastException(new IOException(exc));
-                        notifyOnError();
+                        notifyOnError(exc);
                     }
                 }
 
                 @Override
                 public void onFailure(Throwable cause) {
-                    if (cause instanceof IOException) {
-                        setLastException((IOException)cause);
-                    } else {
-                        setLastException(new IOException(cause));
-                    }
-                    notifyOnError();
+                    notifyOnError(cause);
                 }
             });
             readAheadStarted = true;
@@ -643,7 +637,12 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, 
Runnable, AsyncNotificatio
      * Triggered when the background activity encounters an exception
      */
     @Override
-    public void notifyOnError() {
+    public void notifyOnError(Throwable cause) {
+        if (cause instanceof IOException) {
+            setLastException((IOException) cause);
+        } else {
+            setLastException(new IOException(cause));
+        }
         scheduleBackgroundRead();
     }
 
@@ -661,9 +660,9 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, 
Runnable, AsyncNotificatio
     }
 
     @VisibleForTesting
-    synchronized void disableReadAheadZKNotification() {
-        disableReadAheadZKNotification = true;
-        bkLedgerManager.disableReadAheadZKNotification();
+    synchronized void disableReadAheadLogSegmentsNotification() {
+        disableReadAheadLogSegmentsNotification = true;
+        bkLedgerManager.disableReadAheadLogSegmentsNotification();
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
index 6a9d860..75a5b83 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
@@ -20,6 +20,7 @@ package com.twitter.distributedlog;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
 import com.twitter.distributedlog.bk.DynamicQuorumConfigProvider;
 import com.twitter.distributedlog.bk.LedgerAllocator;
 import com.twitter.distributedlog.bk.LedgerAllocatorDelegator;
@@ -32,6 +33,8 @@ import 
com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.LogEmptyException;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
+import com.twitter.distributedlog.function.CloseAsyncCloseableFunction;
+import com.twitter.distributedlog.function.GetVersionedValueFunction;
 import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
 import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
 import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
@@ -41,6 +44,8 @@ import com.twitter.distributedlog.lock.NopDistributedLock;
 import com.twitter.distributedlog.lock.SessionLockFactory;
 import com.twitter.distributedlog.lock.ZKDistributedLock;
 import com.twitter.distributedlog.lock.ZKSessionLockFactory;
+import com.twitter.distributedlog.logsegment.LogSegmentFilter;
+import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.metadata.BKDLConfig;
 import com.twitter.distributedlog.stats.BroadCastStatsLogger;
@@ -159,6 +164,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
     // log segment metadata stores
     private final LogSegmentMetadataStore writerMetadataStore;
     private final LogSegmentMetadataStore readerMetadataStore;
+    private final LogSegmentMetadataCache logSegmentMetadataCache;
 
     // bookkeeper clients
     // NOTE: The actual bookkeeper client is initialized lazily when it is 
referenced by
@@ -232,6 +238,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
              null,
              null,
              null,
+             new LogSegmentMetadataCache(conf, Ticker.systemTicker()),
              OrderedScheduler.newBuilder().name("BKDL-" + 
name).corePoolSize(1).build(),
              null,
              null,
@@ -293,6 +300,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
                             SessionLockFactory lockFactory,
                             LogSegmentMetadataStore writerMetadataStore,
                             LogSegmentMetadataStore readerMetadataStore,
+                            LogSegmentMetadataCache logSegmentMetadataCache,
                             OrderedScheduler scheduler,
                             OrderedScheduler readAheadScheduler,
                             OrderedScheduler lockStateExecutor,
@@ -336,6 +344,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
         } else {
             this.readerMetadataStore = readerMetadataStore;
         }
+        this.logSegmentMetadataCache = logSegmentMetadataCache;
 
         // create the bkc for writers
         if (null == writerBKCBuilder) {
@@ -451,7 +460,8 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
     private synchronized BKLogReadHandler getReadHandlerForListener(boolean 
create) {
         if (null == readHandlerForListener && create) {
             readHandlerForListener = createReadHandler();
-            readHandlerForListener.scheduleGetLedgersTask(true, true);
+            // start fetch the log segments
+            readHandlerForListener.asyncStartFetchLogSegments();
         }
         return readHandlerForListener;
     }
@@ -463,13 +473,12 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
 
     protected Future<List<LogSegmentMetadata>> getLogSegmentsAsync() {
         final BKLogReadHandler readHandler = createReadHandler();
-        return readHandler.asyncGetFullLedgerList(true, false).ensure(new 
AbstractFunction0<BoxedUnit>() {
-            @Override
-            public BoxedUnit apply() {
-                readHandler.asyncClose();
-                return BoxedUnit.UNIT;
-            }
-        });
+        return readHandler.readLogSegmentsFromStore(
+                LogSegmentMetadata.COMPARATOR,
+                LogSegmentFilter.DEFAULT_FILTER,
+                null)
+                .map(GetVersionedValueFunction.GET_LOGSEGMENT_LIST_FUNC)
+                .ensure(CloseAsyncCloseableFunction.of(readHandler));
     }
 
     @Override
@@ -534,6 +543,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
                 readerZKCBuilder,
                 readerBKCBuilder,
                 readerMetadataStore,
+                logSegmentMetadataCache,
                 scheduler,
                 lockExecutor,
                 readAheadScheduler,
@@ -634,6 +644,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
                 writerZKCBuilder,
                 writerBKCBuilder,
                 writerMetadataStore,
+                logSegmentMetadataCache,
                 scheduler,
                 allocator,
                 statsLogger,

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index 2df1046..0f2c222 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -20,6 +20,7 @@ package com.twitter.distributedlog;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -41,6 +42,7 @@ import 
com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
 import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore;
 import com.twitter.distributedlog.lock.SessionLockFactory;
 import com.twitter.distributedlog.lock.ZKSessionLockFactory;
+import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.metadata.BKDLConfig;
 import com.twitter.distributedlog.metadata.LogMetadataStore;
@@ -301,6 +303,7 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
     // log metadata store
     private final LogMetadataStore metadataStore;
     // log segment metadata store
+    private final LogSegmentMetadataCache logSegmentMetadataCache;
     private final LogSegmentMetadataStore writerSegmentMetadataStore;
     private final LogSegmentMetadataStore readerSegmentMetadataStore;
     // lock factory
@@ -478,6 +481,7 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
                 new ZKLogSegmentMetadataStore(conf, sharedWriterZKCForDL, 
scheduler);
         this.readerSegmentMetadataStore =
                 new ZKLogSegmentMetadataStore(conf, sharedReaderZKCForDL, 
scheduler);
+        this.logSegmentMetadataCache = new LogSegmentMetadataCache(conf, 
Ticker.systemTicker());
 
         LOG.info("Constructed BK DistributedLogNamespace : clientId = {}, 
regionId = {}, federated = {}.",
                 new Object[] { clientId, regionId, 
bkdlConfig.isFederatedNamespace() });
@@ -883,6 +887,7 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
                 lockFactory,                        /* Lock Factory */
                 writerSegmentMetadataStore,         /* Log Segment Metadata 
Store for DL Writers */
                 readerSegmentMetadataStore,         /* Log Segment Metadata 
Store for DL Readers */
+                logSegmentMetadataCache,            /* Log Segment Metadata 
Cache */
                 scheduler,                          /* DL scheduler */
                 readAheadExecutor,                  /* Read Aheader Executor */
                 lockStateExecutor,                  /* Lock State Executor */

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
index 460de11..2a6e85b 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
@@ -19,33 +19,34 @@ package com.twitter.distributedlog;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
-import com.twitter.distributedlog.callback.LogSegmentListener;
+import com.twitter.distributedlog.callback.LogSegmentNamesListener;
 import com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.LogEmptyException;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.exceptions.MetadataException;
+import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
 import com.twitter.distributedlog.exceptions.ZKException;
 import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
 import com.twitter.distributedlog.io.AsyncAbortable;
 import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.distributedlog.logsegment.LogSegmentCache;
+import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
+import com.twitter.distributedlog.logsegment.PerStreamLogSegmentCache;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
+import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.util.Function;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
 import com.twitter.util.Promise;
 import org.apache.bookkeeper.stats.AlertStatsLogger;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -63,12 +64,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -81,11 +78,6 @@ import java.util.concurrent.atomic.AtomicReference;
  * <p>
  * Those operations are:
  * <ul>
- * <li>force_get_list: force to get the list of log segments.
- * <li>get_list: get the list of the log segments. it might just retrieve from
- * local log segment cache.
- * <li>get_filtered_list: get the filtered list of log segments.
- * <li>get_full_list: get the full list of log segments.
  * <li>get_inprogress_segment: time between the inprogress log segment created 
and
  * the handler read it.
  * <li>get_completed_segment: time between a log segment is turned to 
completed and
@@ -98,7 +90,7 @@ import java.util.concurrent.atomic.AtomicReference;
  * @see BKLogWriteHandler
  * @see BKLogReadHandler
  */
-public abstract class BKLogHandler implements Watcher, AsyncCloseable, 
AsyncAbortable {
+public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable {
     static final Logger LOG = LoggerFactory.getLogger(BKLogHandler.class);
 
     protected final ZKLogMetadata logMetadata;
@@ -106,41 +98,26 @@ public abstract class BKLogHandler implements Watcher, 
AsyncCloseable, AsyncAbor
     protected final ZooKeeperClient zooKeeperClient;
     protected final BookKeeperClient bookKeeperClient;
     protected final LogSegmentMetadataStore metadataStore;
+    protected final LogSegmentMetadataCache metadataCache;
     protected final int firstNumEntriesPerReadLastRecordScan;
     protected final int maxNumEntriesPerReadLastRecordScan;
     protected volatile long lastLedgerRollingTimeMillis = -1;
     protected final OrderedScheduler scheduler;
     protected final StatsLogger statsLogger;
     protected final AlertStatsLogger alertStatsLogger;
-    private final AtomicBoolean ledgerListWatchSet = new AtomicBoolean(false);
-    private final AtomicBoolean isFullListFetched = new AtomicBoolean(false);
     protected volatile boolean reportGetSegmentStats = false;
     private final String lockClientId;
     protected final AtomicReference<IOException> metadataException = new 
AtomicReference<IOException>(null);
 
-    // listener
-    protected final CopyOnWriteArraySet<LogSegmentListener> listeners =
-            new CopyOnWriteArraySet<LogSegmentListener>();
+    // Maintain the list of log segments per stream
+    protected final PerStreamLogSegmentCache logSegmentCache;
 
-    // Maintain the list of ledgers
-    protected final LogSegmentCache logSegmentCache;
-    protected volatile SyncGetLedgersCallback firstGetLedgersTask = null;
 
-    protected final AsyncNotification notification;
-    // log segment filter
-    protected final LogSegmentFilter filter;
-
-    // zookeeper children watcher
-    private final Watcher getChildrenWatcher;
 
     // trace
     protected final long metadataLatencyWarnThresholdMillis;
 
     // Stats
-    private final OpStatsLogger forceGetListStat;
-    private final OpStatsLogger getListStat;
-    private final OpStatsLogger getFilteredListStat;
-    private final OpStatsLogger getFullListStat;
     private final OpStatsLogger getInprogressSegmentStat;
     private final OpStatsLogger getCompletedSegmentStat;
     private final OpStatsLogger negativeGetInprogressSegmentStat;
@@ -148,96 +125,6 @@ public abstract class BKLogHandler implements Watcher, 
AsyncCloseable, AsyncAbor
     private final OpStatsLogger recoverLastEntryStats;
     private final OpStatsLogger recoverScannedEntriesStats;
 
-    static class SyncGetLedgersCallback implements 
GenericCallback<List<LogSegmentMetadata>> {
-
-        final String path;
-        final boolean allowEmpty;
-        final CountDownLatch countDownLatch = new CountDownLatch(1);
-        final Promise<List<LogSegmentMetadata>> promise =
-                new Promise<List<LogSegmentMetadata>>();
-
-        int rc = KeeperException.Code.APIERROR.intValue();
-
-        SyncGetLedgersCallback(String path, boolean allowEmpty) {
-            this.path = path;
-            this.allowEmpty = allowEmpty;
-        }
-
-        @Override
-        public void operationComplete(int rc, List<LogSegmentMetadata> 
logSegmentMetadatas) {
-            this.rc = rc;
-            if (KeeperException.Code.OK.intValue() == rc) {
-                LOG.debug("Updated ledgers list for {} : {}", path, 
logSegmentMetadatas);
-                promise.setValue(logSegmentMetadatas);
-            } else if (KeeperException.Code.NONODE.intValue() == rc) {
-                if (allowEmpty) {
-                    promise.setValue(new ArrayList<LogSegmentMetadata>(0));
-                } else {
-                    promise.setException(new LogNotFoundException("Log " + 
path + " is not found"));
-                }
-            } else {
-                promise.setException(new MetadataException("Error getting 
ledgers list for " + path));
-            }
-            countDownLatch.countDown();
-        }
-
-        void waitForFinish() throws IOException {
-            try {
-                countDownLatch.await();
-            } catch (InterruptedException e) {
-                throw new DLInterruptedException("Interrupted on getting 
ledgers list for " + path, e);
-            }
-            if (KeeperException.Code.OK.intValue() != rc) {
-                if (KeeperException.Code.NONODE.intValue() == rc) {
-                    if (!allowEmpty) {
-                        throw new LogNotFoundException("Log " + path + " is 
not found");
-                    }
-                } else {
-                    throw new MetadataException("Error getting ledgers list 
for " + path);
-                }
-            }
-        }
-    }
-
-    static class NOPGetLedgersCallback implements 
GenericCallback<List<LogSegmentMetadata>> {
-
-        final String path;
-
-        NOPGetLedgersCallback(String path) {
-            this.path = path;
-        }
-
-        @Override
-        public void operationComplete(int rc, List<LogSegmentMetadata> 
logSegmentMetadatas) {
-            if (KeeperException.Code.OK.intValue() == rc) {
-                LOG.debug("Updated ledgers list : {}", path, 
logSegmentMetadatas);
-            }
-        }
-    }
-
-    class WatcherGetLedgersCallback implements 
GenericCallback<List<LogSegmentMetadata>>, Runnable {
-
-        final String path;
-
-        WatcherGetLedgersCallback(String path) {
-            this.path = path;
-        }
-
-        @Override
-        public void operationComplete(int rc, List<LogSegmentMetadata> 
logSegmentMetadatas) {
-            if (KeeperException.Code.OK.intValue() == rc) {
-                LOG.debug("Updated ledgers list {} : {}", path, 
logSegmentMetadatas);
-            } else {
-                scheduler.schedule(this, conf.getZKRetryBackoffMaxMillis(), 
TimeUnit.MILLISECONDS);
-            }
-        }
-
-        @Override
-        public void run() {
-            asyncGetLedgerListWithRetries(LogSegmentMetadata.COMPARATOR, 
filter, getChildrenWatcher, this);
-        }
-    }
-
     /**
      * Construct a Bookkeeper journal manager.
      */
@@ -246,11 +133,10 @@ public abstract class BKLogHandler implements Watcher, 
AsyncCloseable, AsyncAbor
                  ZooKeeperClientBuilder zkcBuilder,
                  BookKeeperClientBuilder bkcBuilder,
                  LogSegmentMetadataStore metadataStore,
+                 LogSegmentMetadataCache metadataCache,
                  OrderedScheduler scheduler,
                  StatsLogger statsLogger,
                  AlertStatsLogger alertStatsLogger,
-                 AsyncNotification notification,
-                 LogSegmentFilter filter,
                  String lockClientId) {
         Preconditions.checkNotNull(zkcBuilder);
         Preconditions.checkNotNull(bkcBuilder);
@@ -259,9 +145,9 @@ public abstract class BKLogHandler implements Watcher, 
AsyncCloseable, AsyncAbor
         this.scheduler = scheduler;
         this.statsLogger = statsLogger;
         this.alertStatsLogger = alertStatsLogger;
-        this.notification = notification;
-        this.filter = filter;
-        this.logSegmentCache = new LogSegmentCache(metadata.getLogName());
+        this.logSegmentCache = new PerStreamLogSegmentCache(
+                metadata.getLogName(),
+                conf.isLogSegmentSequenceNumberValidationEnabled());
 
         firstNumEntriesPerReadLastRecordScan = 
conf.getFirstNumEntriesPerReadLastRecordScan();
         maxNumEntriesPerReadLastRecordScan = 
conf.getMaxNumEntriesPerReadLastRecordScan();
@@ -269,20 +155,14 @@ public abstract class BKLogHandler implements Watcher, 
AsyncCloseable, AsyncAbor
         LOG.debug("Using ZK Path {}", logMetadata.getLogRootPath());
         this.bookKeeperClient = bkcBuilder.build();
         this.metadataStore = metadataStore;
+        this.metadataCache = metadataCache;
         this.lockClientId = lockClientId;
 
-        this.getChildrenWatcher = this.zooKeeperClient.getWatcherManager()
-                .registerChildWatcher(logMetadata.getLogSegmentsPath(), this);
-
         // Traces
         this.metadataLatencyWarnThresholdMillis = 
conf.getMetadataLatencyWarnThresholdMillis();
 
         // Stats
         StatsLogger segmentsLogger = statsLogger.scope("logsegments");
-        forceGetListStat = segmentsLogger.getOpStatsLogger("force_get_list");
-        getListStat = segmentsLogger.getOpStatsLogger("get_list");
-        getFilteredListStat = 
segmentsLogger.getOpStatsLogger("get_filtered_list");
-        getFullListStat = segmentsLogger.getOpStatsLogger("get_full_list");
         getInprogressSegmentStat = 
segmentsLogger.getOpStatsLogger("get_inprogress_segment");
         getCompletedSegmentStat = 
segmentsLogger.getOpStatsLogger("get_completed_segment");
         negativeGetInprogressSegmentStat = 
segmentsLogger.getOpStatsLogger("negative_get_inprogress_segment");
@@ -306,67 +186,25 @@ public abstract class BKLogHandler implements Watcher, 
AsyncCloseable, AsyncAbor
         return lockClientId;
     }
 
-    protected void registerListener(LogSegmentListener listener) {
-        listeners.add(listener);
-    }
-
-    protected void unregisterListener(LogSegmentListener listener) {
-        listeners.remove(listener);
-    }
-
-    protected void notifyUpdatedLogSegments(List<LogSegmentMetadata> segments) 
{
-        for (LogSegmentListener listener : listeners) {
-            List<LogSegmentMetadata> listToReturn =
-                    new ArrayList<LogSegmentMetadata>(segments);
-            Collections.sort(listToReturn, LogSegmentMetadata.DESC_COMPARATOR);
-            listener.onSegmentsUpdated(listToReturn);
-        }
-    }
-
-    protected void scheduleGetAllLedgersTaskIfNeeded() {
-        if (isFullListFetched.get()) {
-            return;
-        }
-        asyncGetLedgerListWithRetries(LogSegmentMetadata.COMPARATOR, 
LogSegmentFilter.DEFAULT_FILTER,
-                null, new NOPGetLedgersCallback(getFullyQualifiedName()));
-    }
-
-    protected void scheduleGetLedgersTask(boolean watch, boolean allowEmpty) {
-        if (!watch) {
-            ledgerListWatchSet.set(true);
-        }
-        LOG.info("Scheduling get ledgers task for {}, watch = {}.", 
getFullyQualifiedName(), watch);
-        firstGetLedgersTask = new 
SyncGetLedgersCallback(getFullyQualifiedName(), allowEmpty);
-        asyncGetLedgerListWithRetries(LogSegmentMetadata.COMPARATOR, filter,
-                watch ? getChildrenWatcher : null, firstGetLedgersTask);
-        LOG.info("Scheduled get ledgers task for {}, watch = {}.", 
getFullyQualifiedName(), watch);
-    }
-
-    protected void waitFirstGetLedgersTaskToFinish() throws IOException {
-        SyncGetLedgersCallback task = firstGetLedgersTask;
-        if (null != task) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Wait first getting ledgers task to finish for {}.", 
getFullyQualifiedName());
-            }
-            task.waitForFinish();
-        }
-    }
-
     public Future<LogRecordWithDLSN> asyncGetFirstLogRecord() {
         final Promise<LogRecordWithDLSN> promise = new 
Promise<LogRecordWithDLSN>();
         checkLogStreamExistsAsync().addEventListener(new 
FutureEventListener<Void>() {
             @Override
             public void onSuccess(Void value) {
-                asyncGetFullLedgerList(true, true).addEventListener(new 
FutureEventListener<List<LogSegmentMetadata>>() {
+                readLogSegmentsFromStore(
+                        LogSegmentMetadata.COMPARATOR,
+                        LogSegmentFilter.DEFAULT_FILTER,
+                        null
+                ).addEventListener(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
 
                     @Override
-                    public void onSuccess(List<LogSegmentMetadata> ledgerList) 
{
-                        if (ledgerList.isEmpty()) {
+                    public void onSuccess(Versioned<List<LogSegmentMetadata>> 
ledgerList) {
+                        if (ledgerList.getValue().isEmpty()) {
                             promise.setException(new LogEmptyException("Log " 
+ getFullyQualifiedName() + " has no records"));
                             return;
                         }
                         Future<LogRecordWithDLSN> firstRecord = null;
-                        for (LogSegmentMetadata ledger : ledgerList) {
+                        for (LogSegmentMetadata ledger : 
ledgerList.getValue()) {
                             if (!ledger.isTruncated() && 
(ledger.getRecordCount() > 0 || ledger.isInProgress())) {
                                 firstRecord = asyncReadFirstUserRecord(ledger, 
DLSN.InitialDLSN);
                                 break;
@@ -399,15 +237,25 @@ public abstract class BKLogHandler implements Watcher, 
AsyncCloseable, AsyncAbor
         checkLogStreamExistsAsync().addEventListener(new 
FutureEventListener<Void>() {
             @Override
             public void onSuccess(Void value) {
-                asyncGetFullLedgerListDesc(true, true).addEventListener(new 
FutureEventListener<List<LogSegmentMetadata>>() {
+                readLogSegmentsFromStore(
+                        LogSegmentMetadata.DESC_COMPARATOR,
+                        LogSegmentFilter.DEFAULT_FILTER,
+                        null
+                ).addEventListener(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
 
                     @Override
-                    public void onSuccess(List<LogSegmentMetadata> ledgerList) 
{
-                        if (ledgerList.isEmpty()) {
-                            promise.setException(new LogEmptyException("Log " 
+ getFullyQualifiedName() + " has no records"));
+                    public void onSuccess(Versioned<List<LogSegmentMetadata>> 
ledgerList) {
+                        if (ledgerList.getValue().isEmpty()) {
+                            promise.setException(
+                                    new LogEmptyException("Log " + 
getFullyQualifiedName() + " has no records"));
                             return;
                         }
-                        asyncGetLastLogRecord(ledgerList.iterator(), promise, 
recover, false, includeEndOfStream);
+                        asyncGetLastLogRecord(
+                                ledgerList.getValue().iterator(),
+                                promise,
+                                recover,
+                                false,
+                                includeEndOfStream);
                     }
 
                     @Override
@@ -537,11 +385,15 @@ public abstract class BKLogHandler implements Watcher, 
AsyncCloseable, AsyncAbor
         return checkLogStreamExistsAsync().flatMap(new Function<Void, 
Future<Long>>() {
             public Future<Long> apply(Void done) {
 
-                return asyncGetFullLedgerList(true, false).flatMap(new 
Function<List<LogSegmentMetadata>, Future<Long>>() {
-                    public Future<Long> apply(List<LogSegmentMetadata> 
ledgerList) {
+                return readLogSegmentsFromStore(
+                        LogSegmentMetadata.COMPARATOR,
+                        LogSegmentFilter.DEFAULT_FILTER,
+                        null
+                ).flatMap(new Function<Versioned<List<LogSegmentMetadata>>, 
Future<Long>>() {
+                    public Future<Long> 
apply(Versioned<List<LogSegmentMetadata>> ledgerList) {
 
-                        List<Future<Long>> futureCounts = new 
ArrayList<Future<Long>>(ledgerList.size());
-                        for (LogSegmentMetadata ledger : ledgerList) {
+                        List<Future<Long>> futureCounts = new 
ArrayList<Future<Long>>(ledgerList.getValue().size());
+                        for (LogSegmentMetadata ledger : 
ledgerList.getValue()) {
                             if (ledger.getLogSegmentSequenceNumber() >= 
beginDLSN.getLogSegmentSequenceNo()) {
                                 
futureCounts.add(asyncGetLogRecordCount(ledger, beginDLSN));
                             }
@@ -665,11 +517,25 @@ public abstract class BKLogHandler implements Watcher, 
AsyncCloseable, AsyncAbor
         return logMetadata.getFullyQualifiedName();
     }
 
-    // Ledgers Related Functions
+    // Log Segments Related Functions
+    //
     // ***Note***
-    // Get ledger list should go through #getCachedLogSegments as we need to 
assign start sequence id for inprogress log
-    // segment so the reader could generate the right sequence id.
+    // Get log segment list should go through #getCachedLogSegments as we need 
to assign start sequence id
+    // for inprogress log segment so the reader could generate the right 
sequence id.
+    //
+    // ***PerStreamCache vs LogSegmentMetadataCache **
+    // The per stream cache maintains the list of segments per stream, while 
the metadata cache
+    // maintains log segments. The metadata cache is just to reduce the access 
to zookeeper, it is
+    // okay that some of the log segments are not in the cache; however the 
per stream cache can not
+    // have any gaps between log segment sequence numbers which it has to be 
accurate.
 
+    /**
+     * Get the cached log segments.
+     *
+     * @param comparator the comparator to sort the returned log segments.
+     * @return list of sorted log segments
+     * @throws UnexpectedException if unexpected condition detected.
+     */
     protected List<LogSegmentMetadata> 
getCachedLogSegments(Comparator<LogSegmentMetadata> comparator)
         throws UnexpectedException {
         try {
@@ -683,227 +549,6 @@ public abstract class BKLogHandler implements Watcher, 
AsyncCloseable, AsyncAbor
         }
     }
 
-    protected List<LogSegmentMetadata> getFullLedgerList(boolean forceFetch, 
boolean throwOnEmpty)
-            throws IOException {
-        return getLedgerList(forceFetch, true, LogSegmentMetadata.COMPARATOR, 
throwOnEmpty);
-    }
-
-    protected List<LogSegmentMetadata> getFullLedgerListDesc(boolean 
forceFetch, boolean throwOnEmpty)
-            throws IOException {
-        return getLedgerList(forceFetch, true, 
LogSegmentMetadata.DESC_COMPARATOR, throwOnEmpty);
-    }
-
-    protected List<LogSegmentMetadata> getFilteredLedgerList(boolean 
forceFetch, boolean throwOnEmpty)
-            throws IOException {
-        return getLedgerList(forceFetch, false, LogSegmentMetadata.COMPARATOR, 
throwOnEmpty);
-    }
-
-    protected List<LogSegmentMetadata> getFilteredLedgerListDesc(boolean 
forceFetch, boolean throwOnEmpty)
-            throws IOException {
-        return getLedgerList(forceFetch, false, 
LogSegmentMetadata.DESC_COMPARATOR, throwOnEmpty);
-    }
-
-    protected List<LogSegmentMetadata> getLedgerList(boolean forceFetch,
-                                                     boolean fetchFullList,
-                                                     
Comparator<LogSegmentMetadata> comparator,
-                                                     boolean throwOnEmpty)
-            throws IOException {
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        boolean success = false;
-        try {
-            List<LogSegmentMetadata> segments =
-                    doGetLedgerList(forceFetch, fetchFullList, comparator, 
throwOnEmpty);
-            success = true;
-            return segments;
-        } finally {
-            OpStatsLogger statsLogger = fetchFullList ? getFullListStat : 
getFilteredListStat;
-            if (success) {
-                
statsLogger.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-            } else {
-                
statsLogger.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-            }
-        }
-    }
-
-    private List<LogSegmentMetadata> doGetLedgerList(boolean forceFetch, 
boolean fetchFullList,
-                                                     
Comparator<LogSegmentMetadata> comparator,
-                                                     boolean throwOnEmpty)
-        throws IOException {
-        if (fetchFullList) {
-            if (forceFetch || !isFullListFetched.get()) {
-                return forceGetLedgerList(comparator, 
LogSegmentFilter.DEFAULT_FILTER, throwOnEmpty);
-            } else {
-                return getCachedLogSegments(comparator);
-            }
-        } else {
-            if (forceFetch) {
-                return forceGetLedgerList(comparator, filter, throwOnEmpty);
-            } else {
-                if(!ledgerListWatchSet.get()) {
-                    scheduleGetLedgersTask(true, true);
-                }
-                waitFirstGetLedgersTaskToFinish();
-                return getCachedLogSegments(comparator);
-            }
-        }
-    }
-
-    /**
-     * Get a list of all segments in the journal.
-     */
-    protected List<LogSegmentMetadata> forceGetLedgerList(final 
Comparator<LogSegmentMetadata> comparator,
-                                                                final 
LogSegmentFilter segmentFilter,
-                                                                boolean 
throwOnEmpty) throws IOException {
-        final List<LogSegmentMetadata> ledgers = new 
ArrayList<LogSegmentMetadata>();
-        final AtomicInteger result = new AtomicInteger(-1);
-        final CountDownLatch latch = new CountDownLatch(1);
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        asyncGetLedgerListInternal(comparator, segmentFilter, null, new 
GenericCallback<List<LogSegmentMetadata>>() {
-            @Override
-            public void operationComplete(int rc, List<LogSegmentMetadata> 
logSegmentMetadatas) {
-                result.set(rc);
-                if (KeeperException.Code.OK.intValue() == rc) {
-                    ledgers.addAll(logSegmentMetadatas);
-                } else {
-                    LOG.error("Failed to get ledger list for {} : with error 
{}", getFullyQualifiedName(), rc);
-                }
-                latch.countDown();
-            }
-        }, new AtomicInteger(conf.getZKNumRetries()), new 
AtomicLong(conf.getZKRetryBackoffStartMillis()));
-        try {
-            latch.await();
-        } catch (InterruptedException e) {
-            
forceGetListStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-            throw new DLInterruptedException("Interrupted on reading ledger 
list from zkfor " + getFullyQualifiedName(), e);
-        }
-        long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS);
-
-        KeeperException.Code rc = KeeperException.Code.get(result.get());
-        if (rc == KeeperException.Code.OK) {
-            forceGetListStat.registerSuccessfulEvent(elapsedMicros);
-        } else {
-            forceGetListStat.registerFailedEvent(elapsedMicros);
-            if (KeeperException.Code.NONODE == rc) {
-                throw new LogNotFoundException("Log " + 
getFullyQualifiedName() + " is not found");
-            } else {
-                throw new IOException("ZK Exception " + rc + " reading ledger 
list for " + getFullyQualifiedName());
-            }
-        }
-
-        if (throwOnEmpty && ledgers.isEmpty()) {
-            throw new LogEmptyException("Log " + getFullyQualifiedName() + " 
is empty");
-        }
-        return ledgers;
-    }
-
-    protected Future<List<LogSegmentMetadata>> asyncGetFullLedgerList(boolean 
forceFetch, boolean throwOnEmpty) {
-        return asyncGetLedgerList(forceFetch, true, 
LogSegmentMetadata.COMPARATOR, throwOnEmpty);
-    }
-
-    protected Future<List<LogSegmentMetadata>> 
asyncGetFullLedgerListDesc(boolean forceFetch, boolean throwOnEmpty) {
-        return asyncGetLedgerList(forceFetch, true, 
LogSegmentMetadata.DESC_COMPARATOR, throwOnEmpty);
-    }
-
-    protected Future<List<LogSegmentMetadata>> 
asyncGetFilteredLedgerList(boolean forceFetch, boolean throwOnEmpty) {
-        return asyncGetLedgerList(forceFetch, false, 
LogSegmentMetadata.COMPARATOR, throwOnEmpty);
-    }
-
-    protected Future<List<LogSegmentMetadata>> 
asyncGetFilteredLedgerListDesc(boolean forceFetch, boolean throwOnEmpty) {
-        return asyncGetLedgerList(forceFetch, false, 
LogSegmentMetadata.DESC_COMPARATOR, throwOnEmpty);
-    }
-
-    protected Future<List<LogSegmentMetadata>> asyncGetLedgerList(final 
boolean forceFetch,
-                                                                        final 
boolean fetchFullList,
-                                                                        final 
Comparator<LogSegmentMetadata> comparator,
-                                                                        final 
boolean throwOnEmpty) {
-        final Promise<List<LogSegmentMetadata>> promise = new 
Promise<List<LogSegmentMetadata>>();
-        final Stopwatch stopwatch = Stopwatch.createStarted();
-        final OpStatsLogger statsLogger = fetchFullList ? getFullListStat : 
getFilteredListStat;
-        asyncDoGetLedgerList(forceFetch, fetchFullList, comparator, 
throwOnEmpty)
-                .addEventListener(new 
FutureEventListener<List<LogSegmentMetadata>>() {
-                    @Override
-                    public void onSuccess(List<LogSegmentMetadata> value) {
-                        
statsLogger.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-                        promise.setValue(value);
-                    }
-
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        
statsLogger.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-                        promise.setException(cause);
-                    }
-                });
-        return promise;
-    }
-
-    private Future<List<LogSegmentMetadata>> asyncDoGetLedgerList(final 
boolean forceFetch,
-                                                                  final 
boolean fetchFullList,
-                                                                  final 
Comparator<LogSegmentMetadata> comparator,
-                                                                  final 
boolean throwOnEmpty) {
-        if (fetchFullList) {
-            if (forceFetch || !isFullListFetched.get()) {
-                return asyncForceGetLedgerList(comparator, 
LogSegmentFilter.DEFAULT_FILTER, throwOnEmpty);
-            } else {
-                try {
-                    return Future.value(getCachedLogSegments(comparator));
-                } catch (UnexpectedException ue) {
-                    return Future.exception(ue);
-                }
-            }
-        } else {
-            if (forceFetch) {
-                return asyncForceGetLedgerList(comparator, filter, 
throwOnEmpty);
-            } else {
-                final Promise<List<LogSegmentMetadata>> promise =
-                        new Promise<List<LogSegmentMetadata>>();
-                SyncGetLedgersCallback task = firstGetLedgersTask;
-                task.promise.addEventListener(new 
FutureEventListener<List<LogSegmentMetadata>>() {
-                    @Override
-                    public void onSuccess(List<LogSegmentMetadata> value) {
-                        try {
-                            promise.setValue(getCachedLogSegments(comparator));
-                        } catch (UnexpectedException e) {
-                            promise.setException(e);
-                        }
-                    }
-
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        promise.setException(cause);
-                    }
-                });
-                return promise;
-            }
-        }
-    }
-
-    protected Future<List<LogSegmentMetadata>> asyncForceGetLedgerList(final 
Comparator<LogSegmentMetadata> comparator,
-                                                                       final 
LogSegmentFilter segmentFilter,
-                                                                       final 
boolean throwOnEmpty) {
-        final Promise<List<LogSegmentMetadata>> promise = new 
Promise<List<LogSegmentMetadata>>();
-        final Stopwatch stopwatch = Stopwatch.createStarted();
-        asyncGetLedgerListWithRetries(comparator, segmentFilter, null)
-            .addEventListener(new 
FutureEventListener<List<LogSegmentMetadata>>() {
-
-                @Override
-                public void onSuccess(List<LogSegmentMetadata> ledgers) {
-                    
forceGetListStat.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-                    if (ledgers.isEmpty() && throwOnEmpty) {
-                        promise.setException(new LogEmptyException("Log " + 
getFullyQualifiedName() + " is empty"));
-                    } else {
-                        promise.setValue(ledgers);
-                    }
-                }
-
-                @Override
-                public void onFailure(Throwable cause) {
-                    
forceGetListStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-                    promise.setException(cause);
-                }
-            });
-        return promise;
-    }
-
     /**
      * Add the segment <i>metadata</i> for <i>name</i> in the cache.
      *
@@ -913,6 +558,7 @@ public abstract class BKLogHandler implements Watcher, 
AsyncCloseable, AsyncAbor
      *          segment metadata.
      */
     protected void addLogSegmentToCache(String name, LogSegmentMetadata 
metadata) {
+        metadataCache.put(metadata.getZkPath(), metadata);
         logSegmentCache.add(name, metadata);
         // update the last ledger rolling time
         if (!metadata.isInProgress() && (lastLedgerRollingTimeMillis < 
metadata.getCompletionTime())) {
@@ -952,224 +598,189 @@ public abstract class BKLogHandler implements Watcher, 
AsyncCloseable, AsyncAbor
         }
     }
 
+    /**
+     * Read log segment <i>name</i> from the cache.
+     *
+     * @param name name of the log segment
+     * @return log segment metadata
+     */
     protected LogSegmentMetadata readLogSegmentFromCache(String name) {
         return logSegmentCache.get(name);
     }
 
+    /**
+     * Remove the log segment <i>name</i> from the cache.
+     *
+     * @param name name of the log segment.
+     * @return log segment metadata
+     */
     protected LogSegmentMetadata removeLogSegmentFromCache(String name) {
+        metadataCache.invalidate(name);
         return logSegmentCache.remove(name);
     }
 
-    public void asyncGetLedgerList(final Comparator<LogSegmentMetadata> 
comparator,
-                                   Watcher watcher,
-                                   final 
GenericCallback<List<LogSegmentMetadata>> callback) {
-        asyncGetLedgerListWithRetries(comparator, filter, watcher, callback);
+    /**
+     * Update the log segment cache with updated mapping
+     *
+     * @param logSegmentsRemoved log segments removed
+     * @param logSegmentsAdded log segments added
+     */
+    protected void updateLogSegmentCache(Set<String> logSegmentsRemoved,
+                                         Map<String, LogSegmentMetadata> 
logSegmentsAdded) {
+        for (String segmentName : logSegmentsRemoved) {
+            metadataCache.invalidate(segmentName);
+        }
+        for (Map.Entry<String, LogSegmentMetadata> entry : 
logSegmentsAdded.entrySet()) {
+            metadataCache.put(entry.getKey(), entry.getValue());
+        }
+        logSegmentCache.update(logSegmentsRemoved, logSegmentsAdded);
     }
 
-    protected Future<List<LogSegmentMetadata>> 
asyncGetLedgerListWithRetries(Comparator<LogSegmentMetadata> comparator,
-                                                                             
LogSegmentFilter segmentFilter,
-                                                                             
Watcher watcher) {
-        final Promise<List<LogSegmentMetadata>> promise = new 
Promise<List<LogSegmentMetadata>>();
-        asyncGetLedgerListWithRetries(comparator, segmentFilter, watcher, new 
GenericCallback<List<LogSegmentMetadata>>() {
-            @Override
-            public void operationComplete(int rc, List<LogSegmentMetadata> 
segments) {
-                if (KeeperException.Code.OK.intValue() == rc) {
-                    promise.setValue(segments);
-                } else if (KeeperException.Code.NONODE.intValue() == rc) {
-                    promise.setException(new LogNotFoundException("Log " + 
getFullyQualifiedName() + " not found"));
-                } else {
-                    String errMsg = "ZK Exception " + rc + " reading ledger 
list for " + getFullyQualifiedName();
-                    promise.setException(new ZKException(errMsg, 
KeeperException.Code.get(rc)));
-                }
-            }
-        });
-        return promise;
-    }
+    /**
+     * Read the log segments from the store and register a listener
+     * @param comparator
+     * @param segmentFilter
+     * @param logSegmentNamesListener
+     * @return future represents the result of log segments
+     */
+    public Future<Versioned<List<LogSegmentMetadata>>> 
readLogSegmentsFromStore(
+            final Comparator<LogSegmentMetadata> comparator,
+            final LogSegmentFilter segmentFilter,
+            final LogSegmentNamesListener logSegmentNamesListener) {
+        final Promise<Versioned<List<LogSegmentMetadata>>> readResult =
+                new Promise<Versioned<List<LogSegmentMetadata>>>();
+        metadataStore.getLogSegmentNames(logMetadata.getLogSegmentsPath(), 
logSegmentNamesListener)
+                .addEventListener(new 
FutureEventListener<Versioned<List<String>>>() {
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        FutureUtils.setException(readResult, cause);
+                    }
 
-    private void asyncGetLedgerListWithRetries(final 
Comparator<LogSegmentMetadata> comparator,
-                                               final LogSegmentFilter 
segmentFilter,
-                                               final Watcher watcher,
-                                               final 
GenericCallback<List<LogSegmentMetadata>> finalCallback) {
-        asyncGetLedgerListInternal(comparator, segmentFilter, watcher, 
finalCallback,
-                new AtomicInteger(conf.getZKNumRetries()), new 
AtomicLong(conf.getZKRetryBackoffStartMillis()));
+                    @Override
+                    public void onSuccess(Versioned<List<String>> 
logSegmentNames) {
+                        readLogSegmentsFromStore(logSegmentNames, comparator, 
segmentFilter, readResult);
+                    }
+                });
+        return readResult;
     }
 
-    private void asyncGetLedgerListInternal(final 
Comparator<LogSegmentMetadata> comparator,
+    protected void readLogSegmentsFromStore(final Versioned<List<String>> 
logSegmentNames,
+                                            final 
Comparator<LogSegmentMetadata> comparator,
                                             final LogSegmentFilter 
segmentFilter,
-                                            final Watcher watcher,
-                                            final 
GenericCallback<List<LogSegmentMetadata>> finalCallback,
-                                            final AtomicInteger 
numAttemptsLeft,
-                                            final AtomicLong backoffMillis) {
-        final Stopwatch stopwatch = Stopwatch.createStarted();
-        try {
+                                            final 
Promise<Versioned<List<LogSegmentMetadata>>> readResult) {
+        Set<String> segmentsReceived = new HashSet<String>();
+        
segmentsReceived.addAll(segmentFilter.filter(logSegmentNames.getValue()));
+        Set<String> segmentsAdded;
+        final Set<String> removedSegments = Collections.synchronizedSet(new 
HashSet<String>());
+        final Map<String, LogSegmentMetadata> addedSegments =
+                Collections.synchronizedMap(new HashMap<String, 
LogSegmentMetadata>());
+        Pair<Set<String>, Set<String>> segmentChanges = 
logSegmentCache.diff(segmentsReceived);
+        segmentsAdded = segmentChanges.getLeft();
+        removedSegments.addAll(segmentChanges.getRight());
+
+        if (segmentsAdded.isEmpty()) {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Async getting ledger list for {}.", 
getFullyQualifiedName());
+                LOG.trace("No segments added for {}.", 
getFullyQualifiedName());
             }
-            final GenericCallback<List<LogSegmentMetadata>> callback = new 
GenericCallback<List<LogSegmentMetadata>>() {
-                @Override
-                public void operationComplete(int rc, List<LogSegmentMetadata> 
result) {
-                    long elapsedMicros = 
stopwatch.stop().elapsed(TimeUnit.MICROSECONDS);
-                    if (KeeperException.Code.OK.intValue() != rc) {
-                        getListStat.registerFailedEvent(elapsedMicros);
-                    } else {
-                        if (LogSegmentFilter.DEFAULT_FILTER == segmentFilter) {
-                            isFullListFetched.set(true);
-                        }
-                        getListStat.registerSuccessfulEvent(elapsedMicros);
-                    }
-                    finalCallback.operationComplete(rc, result);
-                }
-            };
-            
zooKeeperClient.get().getChildren(logMetadata.getLogSegmentsPath(), watcher, 
new AsyncCallback.Children2Callback() {
-                @Override
-                public void processResult(final int rc, final String path, 
final Object ctx, final List<String> children, final Stat stat) {
-                    if (KeeperException.Code.OK.intValue() != rc) {
 
-                        if ((KeeperException.Code.CONNECTIONLOSS.intValue() == 
rc ||
-                             KeeperException.Code.SESSIONEXPIRED.intValue() == 
rc ||
-                             KeeperException.Code.SESSIONMOVED.intValue() == 
rc) &&
-                            numAttemptsLeft.decrementAndGet() > 0) {
-                            long backoffMs = backoffMillis.get();
-                            
backoffMillis.set(Math.min(conf.getZKRetryBackoffMaxMillis(), 2 * backoffMs));
-                            scheduler.schedule(new Runnable() {
-                                @Override
-                                public void run() {
-                                    asyncGetLedgerListInternal(comparator, 
segmentFilter, watcher,
-                                            finalCallback, numAttemptsLeft, 
backoffMillis);
-                                }
-                            }, backoffMs, TimeUnit.MILLISECONDS);
-                            return;
-                        }
-                        callback.operationComplete(rc, null);
-                        return;
-                    }
-
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Got ledger list from {} : {}", 
logMetadata.getLogSegmentsPath(), children);
-                    }
+            // update the cache before #getCachedLogSegments to return
+            updateLogSegmentCache(removedSegments, addedSegments);
 
-                    ledgerListWatchSet.set(true);
-                    Set<String> segmentsReceived = new HashSet<String>();
-                    segmentsReceived.addAll(segmentFilter.filter(children));
-                    Set<String> segmentsAdded;
-                    final Set<String> removedSegments = 
Collections.synchronizedSet(new HashSet<String>());
-                    final Map<String, LogSegmentMetadata> addedSegments =
-                            Collections.synchronizedMap(new HashMap<String, 
LogSegmentMetadata>());
-                    Pair<Set<String>, Set<String>> segmentChanges = 
logSegmentCache.diff(segmentsReceived);
-                    segmentsAdded = segmentChanges.getLeft();
-                    removedSegments.addAll(segmentChanges.getRight());
+            List<LogSegmentMetadata> segmentList;
+            try {
+                segmentList = getCachedLogSegments(comparator);
+            } catch (UnexpectedException e) {
+                FutureUtils.setException(readResult, e);
+                return;
+            }
 
-                    if (segmentsAdded.isEmpty()) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("No segments added for {}.", 
getFullyQualifiedName());
-                        }
+            FutureUtils.setValue(readResult,
+                    new Versioned<List<LogSegmentMetadata>>(segmentList, 
logSegmentNames.getVersion()));
+            return;
+        }
 
-                        // update the cache before fetch
-                        logSegmentCache.update(removedSegments, addedSegments);
+        final AtomicInteger numChildren = new 
AtomicInteger(segmentsAdded.size());
+        final AtomicInteger numFailures = new AtomicInteger(0);
+        for (final String segment: segmentsAdded) {
+            String logSegmentPath = logMetadata.getLogSegmentPath(segment);
+            LogSegmentMetadata cachedSegment = 
metadataCache.get(logSegmentPath);
+            if (null != cachedSegment) {
+                addedSegments.put(segment, cachedSegment);
+                completeReadLogSegmentsFromStore(
+                        removedSegments,
+                        addedSegments,
+                        comparator,
+                        readResult,
+                        logSegmentNames.getVersion(),
+                        numChildren,
+                        numFailures);
+                continue;
+            }
+            metadataStore.getLogSegment(logSegmentPath)
+                    .addEventListener(new 
FutureEventListener<LogSegmentMetadata>() {
 
-                        List<LogSegmentMetadata> segmentList;
-                        try {
-                            segmentList = getCachedLogSegments(comparator);
-                        } catch (UnexpectedException e) {
-                            
callback.operationComplete(KeeperException.Code.DATAINCONSISTENCY.intValue(), 
null);
-                            return;
-                        }
-                        
callback.operationComplete(KeeperException.Code.OK.intValue(), segmentList);
-                        notifyUpdatedLogSegments(segmentList);
-                        if (!removedSegments.isEmpty()) {
-                            notifyOnOperationComplete();
+                        @Override
+                        public void onSuccess(LogSegmentMetadata result) {
+                            addedSegments.put(segment, result);
+                            complete();
                         }
-                        return;
-                    }
 
-                    final AtomicInteger numChildren = new 
AtomicInteger(segmentsAdded.size());
-                    final AtomicInteger numFailures = new AtomicInteger(0);
-                    for (final String segment: segmentsAdded) {
-                        
metadataStore.getLogSegment(logMetadata.getLogSegmentPath(segment))
-                                .addEventListener(new 
FutureEventListener<LogSegmentMetadata>() {
-
-                                    @Override
-                                    public void onSuccess(LogSegmentMetadata 
result) {
-                                        addedSegments.put(segment, result);
-                                        complete();
-                                    }
-
-                                    @Override
-                                    public void onFailure(Throwable cause) {
-                                        // NONODE exception is possible in two 
cases
-                                        // 1. A log segment was deleted by 
truncation between the call to getChildren and read
-                                        // attempt on the znode corresponding 
to the segment
-                                        // 2. In progress segment has been 
completed => inprogress ZNode does not exist
-                                        if (cause instanceof KeeperException &&
-                                                KeeperException.Code.NONODE == 
((KeeperException) cause).code()) {
-                                            removedSegments.add(segment);
-                                            complete();
-                                        } else {
-                                            // fail fast
-                                            if (1 == 
numFailures.incrementAndGet()) {
-                                                int rcToReturn = 
KeeperException.Code.SYSTEMERROR.intValue();
-                                                if (cause instanceof 
KeeperException) {
-                                                    rcToReturn = 
((KeeperException) cause).code().intValue();
-                                                } else if (cause instanceof 
ZKException) {
-                                                    rcToReturn = 
((ZKException) cause).getKeeperExceptionCode().intValue();
-                                                }
-                                                // :( properly we need dlog 
related response code.
-                                                
callback.operationComplete(rcToReturn, null);
-                                                return;
-                                            }
-                                        }
-                                    }
+                        @Override
+                        public void onFailure(Throwable cause) {
+                            // LogSegmentNotFoundException exception is 
possible in two cases
+                            // 1. A log segment was deleted by truncation 
between the call to getChildren and read
+                            // attempt on the znode corresponding to the 
segment
+                            // 2. In progress segment has been completed => 
inprogress ZNode does not exist
+                            if (cause instanceof LogSegmentNotFoundException) {
+                                removedSegments.add(segment);
+                                complete();
+                            } else {
+                                // fail fast
+                                if (1 == numFailures.incrementAndGet()) {
+                                    FutureUtils.setException(readResult, 
cause);
+                                    return;
+                                }
+                            }
+                        }
 
-                                    private void complete() {
-                                        if (0 == numChildren.decrementAndGet() 
&& numFailures.get() == 0) {
-                                            // update the cache only when 
fetch completed
-                                            
logSegmentCache.update(removedSegments, addedSegments);
-                                            List<LogSegmentMetadata> 
segmentList;
-                                            try {
-                                                segmentList = 
getCachedLogSegments(comparator);
-                                            } catch (UnexpectedException e) {
-                                                
callback.operationComplete(KeeperException.Code.DATAINCONSISTENCY.intValue(), 
null);
-                                                return;
-                                            }
-                                            
callback.operationComplete(KeeperException.Code.OK.intValue(), segmentList);
-                                            
notifyUpdatedLogSegments(segmentList);
-                                            notifyOnOperationComplete();
-                                        }
-                                    }
-                                });
-                    }
-                }
-            }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            
getListStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-            
finalCallback.operationComplete(KeeperException.Code.CONNECTIONLOSS.intValue(), 
null);
-        } catch (InterruptedException e) {
-            
getListStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-            
finalCallback.operationComplete(KeeperException.Code.CONNECTIONLOSS.intValue(), 
null);
+                        private void complete() {
+                            completeReadLogSegmentsFromStore(
+                                    removedSegments,
+                                    addedSegments,
+                                    comparator,
+                                    readResult,
+                                    logSegmentNames.getVersion(),
+                                    numChildren,
+                                    numFailures);
+                        }
+                    });
         }
     }
 
-    @Override
-    public void process(WatchedEvent event) {
-        if (Watcher.Event.EventType.None.equals(event.getType())) {
-            if (event.getState() == Watcher.Event.KeeperState.Expired) {
-                // if the watcher is expired
-                scheduler.schedule(new 
WatcherGetLedgersCallback(getFullyQualifiedName()),
-                        conf.getZKRetryBackoffStartMillis(), 
TimeUnit.MILLISECONDS);
-            }
-        } else if 
(Watcher.Event.EventType.NodeChildrenChanged.equals(event.getType())) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("LogSegments Changed under {}.", 
getFullyQualifiedName());
-            }
-            asyncGetLedgerListWithRetries(LogSegmentMetadata.COMPARATOR, 
filter,
-                    getChildrenWatcher, new 
WatcherGetLedgersCallback(getFullyQualifiedName()));
+    private void completeReadLogSegmentsFromStore(final Set<String> 
removedSegments,
+                                                  final Map<String, 
LogSegmentMetadata> addedSegments,
+                                                  final 
Comparator<LogSegmentMetadata> comparator,
+                                                  final 
Promise<Versioned<List<LogSegmentMetadata>>> readResult,
+                                                  final Version 
logSegmentNamesVersion,
+                                                  final AtomicInteger 
numChildren,
+                                                  final AtomicInteger 
numFailures) {
+        if (0 != numChildren.decrementAndGet()) {
+            return;
         }
-    }
-
-    void notifyOnOperationComplete() {
-        if (null != notification) {
-            notification.notifyOnOperationComplete();
+        if (numFailures.get() > 0) {
+            return;
+        }
+        // update the cache only when fetch completed and before 
#getCachedLogSegments
+        updateLogSegmentCache(removedSegments, addedSegments);
+        List<LogSegmentMetadata> segmentList;
+        try {
+            segmentList = getCachedLogSegments(comparator);
+        } catch (UnexpectedException e) {
+            FutureUtils.setException(readResult, e);
+            return;
         }
+        FutureUtils.setValue(readResult,
+            new Versioned<List<LogSegmentMetadata>>(segmentList, 
logSegmentNamesVersion));
     }
 
 }

Reply via email to