http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
index 5c15009..9ba8ca4 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
@@ -24,7 +24,6 @@ import com.twitter.distributedlog.AsyncNotification;
 import com.twitter.distributedlog.BKLogHandler;
 import com.twitter.distributedlog.DLSN;
 import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.DistributedLogConstants;
 import com.twitter.distributedlog.LedgerDescriptor;
 import com.twitter.distributedlog.LedgerHandleCache;
 import com.twitter.distributedlog.LedgerReadPosition;
@@ -32,13 +31,16 @@ import 
com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.LogReadException;
 import com.twitter.distributedlog.LogSegmentMetadata;
 import com.twitter.distributedlog.ReadAheadCache;
-import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.callback.LogSegmentListener;
 import com.twitter.distributedlog.callback.ReadAheadCallback;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.DLInterruptedException;
+import com.twitter.distributedlog.exceptions.UnexpectedException;
+import com.twitter.distributedlog.exceptions.ZKException;
 import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
 import com.twitter.distributedlog.injector.AsyncFailureInjector;
 import com.twitter.distributedlog.io.AsyncCloseable;
+import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
@@ -55,10 +57,8 @@ import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Function1;
@@ -97,7 +97,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * Exceptions Handling Phase: Handle all the exceptions and properly schedule 
next readahead request.
  * </p>
  */
-public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, 
AsyncCloseable {
+public class ReadAheadWorker implements ReadAheadCallback, Runnable, 
AsyncCloseable, LogSegmentListener {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ReadAheadWorker.class);
 
@@ -115,7 +115,6 @@ public class ReadAheadWorker implements ReadAheadCallback, 
Runnable, Watcher, As
     protected final AsyncNotification notification;
 
     // resources
-    private final ZooKeeperClient zkc;
     protected final OrderedScheduler scheduler;
     private final LedgerHandleCache handleCache;
     private final ReadAheadCache readAheadCache;
@@ -144,10 +143,6 @@ public class ReadAheadWorker implements ReadAheadCallback, 
Runnable, Watcher, As
     // LogSegments & Metadata Notification
     //
 
-    // variables related to getting log segments from zookeeper
-    volatile boolean zkNotificationDisabled = false;
-    private final Watcher getLedgersWatcher;
-
     // variables related to zookeeper watcher notification to interrupt long 
poll waits
     final Object notificationLock = new Object();
     AsyncNotification metadataNotification = null;
@@ -155,11 +150,13 @@ public class ReadAheadWorker implements 
ReadAheadCallback, Runnable, Watcher, As
 
     // variables related to log segments
     private volatile boolean reInitializeMetadata = true;
+    private volatile boolean forceReadLogSegments = false;
     volatile boolean inProgressChanged = false;
     private LogSegmentMetadata currentMetadata = null;
     private int currentMetadataIndex;
     protected LedgerDescriptor currentLH;
-    private volatile List<LogSegmentMetadata> ledgerList;
+    private volatile List<LogSegmentMetadata> logSegmentListNotified;
+    private volatile List<LogSegmentMetadata> logSegmentList;
 
     //
     // ReadAhead Phases
@@ -208,7 +205,6 @@ public class ReadAheadWorker implements ReadAheadCallback, 
Runnable, Watcher, As
                            DynamicDistributedLogConfiguration dynConf,
                            ZKLogMetadataForReader logMetadata,
                            BKLogHandler ledgerManager,
-                           ZooKeeperClient zkc,
                            OrderedScheduler scheduler,
                            LedgerHandleCache handleCache,
                            LedgerReadPosition startPosition,
@@ -229,7 +225,6 @@ public class ReadAheadWorker implements ReadAheadCallback, 
Runnable, Watcher, As
         this.isHandleForReading = isHandleForReading;
         this.notification = notification;
         // Resources
-        this.zkc = zkc;
         this.scheduler = scheduler;
         this.handleCache = handleCache;
         this.readAheadCache = readAheadCache;
@@ -237,8 +232,7 @@ public class ReadAheadWorker implements ReadAheadCallback, 
Runnable, Watcher, As
         this.startReadPosition = new LedgerReadPosition(startPosition);
         this.nextReadAheadPosition = new LedgerReadPosition(startPosition);
         // LogSegments
-        this.getLedgersWatcher = this.zkc.getWatcherManager()
-                .registerChildWatcher(logMetadata.getLogSegmentsPath(), this);
+
         // Failure Detection
         this.failureInjector = failureInjector;
         // Tracing
@@ -283,12 +277,12 @@ public class ReadAheadWorker implements 
ReadAheadCallback, Runnable, Watcher, As
     // ReadAhead Status
     //
 
-    void setReadAheadError(ReadAheadTracker tracker) {
+    void setReadAheadError(ReadAheadTracker tracker, Throwable cause) {
         LOG.error("Read Ahead for {} is set to error.", 
logMetadata.getFullyQualifiedName());
         readAheadError = true;
         tracker.enterPhase(ReadAheadPhase.ERROR);
         if (null != notification) {
-            notification.notifyOnError();
+            notification.notifyOnError(cause);
         }
         if (null != stopPromise) {
             FutureUtils.setValue(stopPromise, null);
@@ -299,7 +293,8 @@ public class ReadAheadWorker implements ReadAheadCallback, 
Runnable, Watcher, As
         readAheadInterrupted = true;
         tracker.enterPhase(ReadAheadPhase.INTERRUPTED);
         if (null != notification) {
-            notification.notifyOnError();
+            notification.notifyOnError(new DLInterruptedException("ReadAhead 
worker for "
+                    + bkLedgerManager.getFullyQualifiedName() + " is 
interrupted."));
         }
         if (null != stopPromise) {
             FutureUtils.setValue(stopPromise, null);
@@ -310,7 +305,9 @@ public class ReadAheadWorker implements ReadAheadCallback, 
Runnable, Watcher, As
         readingFromTruncated = true;
         tracker.enterPhase(ReadAheadPhase.TRUNCATED);
         if (null != notification) {
-            notification.notifyOnError();
+            notification.notifyOnError(
+                    new 
AlreadyTruncatedTransactionException(logMetadata.getFullyQualifiedName()
+                            + ": Trying to position read ahead to a segment 
that is marked truncated"));
         }
         if (null != stopPromise) {
             FutureUtils.setValue(stopPromise, null);
@@ -347,9 +344,11 @@ public class ReadAheadWorker implements ReadAheadCallback, 
Runnable, Watcher, As
         return !isCatchingUp;
     }
 
-    public void start() {
-        LOG.debug("Starting ReadAhead Worker for {}", fullyQualifiedName);
+    public void start(List<LogSegmentMetadata> segmentList) {
+        LOG.debug("Starting ReadAhead Worker for {} : segments = {}",
+                fullyQualifiedName, segmentList);
         running = true;
+        logSegmentListNotified = segmentList;
         schedulePhase.process(BKException.Code.OK);
     }
 
@@ -358,9 +357,6 @@ public class ReadAheadWorker implements ReadAheadCallback, 
Runnable, Watcher, As
         LOG.info("Stopping Readahead worker for {}", fullyQualifiedName);
         running = false;
 
-        this.zkc.getWatcherManager()
-                .unregisterChildWatcher(this.logMetadata.getLogSegmentsPath(), 
this, true);
-
         // Aside from unfortunate naming of variables, this allows
         // the currently active long poll to be interrupted and completed
         AsyncNotification notification;
@@ -417,7 +413,7 @@ public class ReadAheadWorker implements ReadAheadCallback, 
Runnable, Watcher, As
                 } catch (RuntimeException rte) {
                     LOG.error("ReadAhead on stream {} encountered runtime 
exception",
                             logMetadata.getFullyQualifiedName(), rte);
-                    setReadAheadError(tracker);
+                    setReadAheadError(tracker, rte);
                     throw rte;
                 }
             }
@@ -455,7 +451,7 @@ public class ReadAheadWorker implements ReadAheadCallback, 
Runnable, Watcher, As
         try {
             scheduler.submit(addRTEHandler(runnable));
         } catch (RejectedExecutionException ree) {
-            setReadAheadError(tracker);
+            setReadAheadError(tracker, ree);
         }
     }
 
@@ -470,7 +466,7 @@ public class ReadAheadWorker implements ReadAheadCallback, 
Runnable, Watcher, As
             scheduler.schedule(addRTEHandler(task), timeInMillis, 
TimeUnit.MILLISECONDS);
             readAheadWorkerWaits.inc();
         } catch (RejectedExecutionException ree) {
-            setReadAheadError(tracker);
+            setReadAheadError(tracker, ree);
         }
     }
 
@@ -533,12 +529,14 @@ public class ReadAheadWorker implements 
ReadAheadCallback, Runnable, Watcher, As
                     LOG.error("{} : BookKeeper Client used by the ReadAhead 
Thread has encountered {} zookeeper exceptions : simulate = {}",
                               new Object[] { fullyQualifiedName, 
bkcZkExceptions.get(), injectErrors });
                     running = false;
-                    setReadAheadError(tracker);
+                    setReadAheadError(tracker, new LogReadException(
+                            "Encountered too many zookeeper issues on read 
ahead for " + bkLedgerManager.getFullyQualifiedName()));
                 } else if (bkcUnExpectedExceptions.get() > 
BKC_UNEXPECTED_EXCEPTION_THRESHOLD) {
                     LOG.error("{} : ReadAhead Thread has encountered {} 
unexpected BK exceptions.",
                               fullyQualifiedName, 
bkcUnExpectedExceptions.get());
                     running = false;
-                    setReadAheadError(tracker);
+                    setReadAheadError(tracker, new LogReadException(
+                            "Encountered too many unexpected bookkeeper issues 
on read ahead for " + bkLedgerManager.getFullyQualifiedName()));
                 } else {
                     // We must always reinitialize metadata if the last 
attempt to read failed.
                     reInitializeMetadata = true;
@@ -629,39 +627,21 @@ public class ReadAheadWorker implements 
ReadAheadCallback, Runnable, Watcher, As
      * Phase on checking in progress changed.
      */
     final class CheckInProgressChangedPhase extends Phase
-        implements 
BookkeeperInternalCallbacks.GenericCallback<List<LogSegmentMetadata>> {
+            implements 
FutureEventListener<Versioned<List<LogSegmentMetadata>>> {
 
         CheckInProgressChangedPhase(Phase next) {
             super(next);
         }
 
-        @Override
-        public void operationComplete(final int rc, final 
List<LogSegmentMetadata> result) {
+        void processLogSegments(final List<LogSegmentMetadata> segments) {
             // submit callback execution to dlg executor to avoid deadlock.
             submit(new Runnable() {
                 @Override
                 public void run() {
-                    if (KeeperException.Code.OK.intValue() != rc) {
-                        if (KeeperException.Code.NONODE.intValue() == rc) {
-                            LOG.info("Log {} has been deleted. Set ReadAhead 
to error to stop reading.",
-                                    logMetadata.getFullyQualifiedName());
-                            logDeleted = true;
-                            setReadAheadError(tracker);
-                            return;
-                        }
-                        LOG.info("ZK Exception {} while reading ledger list", 
rc);
-                        reInitializeMetadata = true;
-                        if 
(DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
-                            handleException(ReadAheadPhase.GET_LEDGERS, 
BKException.Code.InterruptedException);
-                        } else {
-                            handleException(ReadAheadPhase.GET_LEDGERS, 
BKException.Code.ZKException);
-                        }
-                        return;
-                    }
-                    ledgerList = result;
+                    logSegmentList = segments;
                     boolean isInitialPositioning = 
nextReadAheadPosition.definitelyLessThanOrEqualTo(startReadPosition);
-                    for (int i = 0; i < ledgerList.size(); i++) {
-                        LogSegmentMetadata l = ledgerList.get(i);
+                    for (int i = 0; i < logSegmentList.size(); i++) {
+                        LogSegmentMetadata l = logSegmentList.get(i);
                         // By default we should skip truncated segments during 
initial positioning
                         if (l.isTruncated() &&
                             isInitialPositioning &&
@@ -820,11 +800,34 @@ public class ReadAheadWorker implements 
ReadAheadCallback, Runnable, Watcher, As
                             
TimeUnit.MILLISECONDS.toMicros(elapsedMillisSinceMetadataChanged));
                     metadataNotificationTimeMillis = -1L;
                 }
-                
bkLedgerManager.asyncGetLedgerList(LogSegmentMetadata.COMPARATOR, 
getLedgersWatcher, this);
+                if (forceReadLogSegments) {
+                    forceReadLogSegments = false;
+                    bkLedgerManager.readLogSegmentsFromStore(
+                            LogSegmentMetadata.COMPARATOR,
+                            LogSegmentFilter.DEFAULT_FILTER,
+                            null
+                    ).addEventListener(this);
+                } else {
+                    processLogSegments(logSegmentListNotified);
+                }
             } else {
                 next.process(BKException.Code.OK);
             }
         }
+
+        @Override
+        public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) {
+            processLogSegments(segments.getValue());
+        }
+
+        @Override
+        public void onFailure(Throwable cause) {
+            LOG.info("Encountered metadata exception while reading log 
segments of {} : {}. Retrying ...",
+                    bkLedgerManager.getFullyQualifiedName(), 
cause.getMessage());
+            reInitializeMetadata = true;
+            forceReadLogSegments = true;
+            handleException(ReadAheadPhase.GET_LEDGERS, 
BKException.Code.ZKException);
+        }
     }
 
     final class OpenLedgerPhase extends Phase
@@ -922,6 +925,7 @@ public class ReadAheadWorker implements ReadAheadCallback, 
Runnable, Watcher, As
                                     LOG.info("{} Ledger {} for inprogress 
segment {} closed for idle reader warn threshold",
                                         new Object[] { fullyQualifiedName, 
currentMetadata, currentLH });
                                     reInitializeMetadata = true;
+                                    forceReadLogSegments = true;
                                 }
                             } else {
                                 lastLedgerCloseDetected.reset().start();
@@ -986,7 +990,9 @@ public class ReadAheadWorker implements ReadAheadCallback, 
Runnable, Watcher, As
                                 }
 
                                 if (conf.getPositionGapDetectionEnabled() && 
gapDetected) {
-                                    setReadAheadError(tracker);
+                                    setReadAheadError(tracker, new 
UnexpectedException(
+                                            "Unexpected last entry id during 
read ahead : " + currentMetadata
+                                                    + ", lac = " + 
lastAddConfirmed));
                                 } else {
                                     // This disconnect will only surface 
during repositioning and
                                     // will not silently miss records; 
therefore its safe to not halt
@@ -1003,14 +1009,16 @@ public class ReadAheadWorker implements 
ReadAheadCallback, Runnable, Watcher, As
                                     }
                                     LogSegmentMetadata oldMetadata = 
currentMetadata;
                                     currentMetadata = null;
-                                    if (currentMetadataIndex + 1 < 
ledgerList.size()) {
-                                        currentMetadata = 
ledgerList.get(++currentMetadataIndex);
+                                    if (currentMetadataIndex + 1 < 
logSegmentList.size()) {
+                                        currentMetadata = 
logSegmentList.get(++currentMetadataIndex);
                                         if 
(currentMetadata.getLogSegmentSequenceNumber() != 
(oldMetadata.getLogSegmentSequenceNumber() + 1)) {
                                             // We should never get here as we 
should have exited the loop if
                                             // pendingRequests were empty
                                             alertStatsLogger.raise("Unexpected 
condition during read ahead; {} , {}",
                                                 currentMetadata, oldMetadata);
-                                            setReadAheadError(tracker);
+                                            setReadAheadError(tracker, new 
UnexpectedException(
+                                                    "Unexpected condition 
during read ahead : current metadata "
+                                                            + currentMetadata 
+ ", old metadata " + oldMetadata));
                                         } else {
                                             if (currentMetadata.isTruncated()) 
{
                                                 if 
(conf.getAlertWhenPositioningOnTruncated()) {
@@ -1345,37 +1353,26 @@ public class ReadAheadWorker implements 
ReadAheadCallback, Runnable, Watcher, As
     }
 
     @Override
-    public void process(WatchedEvent event) {
-        if (zkNotificationDisabled) {
-            return;
+    public void onSegmentsUpdated(List<LogSegmentMetadata> segments) {
+        AsyncNotification notification;
+        synchronized (notificationLock) {
+            logSegmentListNotified = segments;
+            reInitializeMetadata = true;
+            LOG.debug("{} Read ahead node changed", fullyQualifiedName);
+            notification = metadataNotification;
+            metadataNotification = null;
         }
-
-        if ((event.getType() == Watcher.Event.EventType.None)
-                && (event.getState() == 
Watcher.Event.KeeperState.SyncConnected)) {
-            LOG.debug("Reconnected ...");
-        } else if (((event.getType() == Event.EventType.None) && 
(event.getState() == Event.KeeperState.Expired)) ||
-                   ((event.getType() == Event.EventType.NodeChildrenChanged))) 
{
-            AsyncNotification notification;
-            synchronized (notificationLock) {
-                reInitializeMetadata = true;
-                LOG.debug("{} Read ahead node changed", fullyQualifiedName);
-                notification = metadataNotification;
-                metadataNotification = null;
-            }
-            metadataNotificationTimeMillis = System.currentTimeMillis();
-            if (null != notification) {
-                notification.notifyOnOperationComplete();
-            }
-        } else if (event.getType() == Event.EventType.NodeDeleted) {
-            logDeleted = true;
-            setReadAheadError(tracker);
+        metadataNotificationTimeMillis = System.currentTimeMillis();
+        if (null != notification) {
+            notification.notifyOnOperationComplete();
         }
     }
 
-    @VisibleForTesting
-    public void disableZKNotification() {
-        LOG.info("{} ZK Notification was disabled", fullyQualifiedName);
-        zkNotificationDisabled = true;
+    @Override
+    public void onLogStreamDeleted() {
+        logDeleted = true;
+        setReadAheadError(tracker, new LogNotFoundException("Log stream "
+                + bkLedgerManager.getFullyQualifiedName() + " is deleted."));
     }
 
     /**
@@ -1424,7 +1421,7 @@ public class ReadAheadWorker implements 
ReadAheadCallback, Runnable, Watcher, As
         }
 
         @Override
-        public void notifyOnError() {
+        public void notifyOnError(Throwable t) {
             
longPollInterruptionStat.registerFailedEvent(MathUtils.elapsedMicroSec(startNanos));
             execute();
         }
@@ -1477,7 +1474,7 @@ public class ReadAheadWorker implements 
ReadAheadCallback, Runnable, Watcher, As
         abstract void doComplete(boolean success);
 
         @Override
-        public void notifyOnError() {
+        public void notifyOnError(Throwable cause) {
             
longPollInterruptionStat.registerFailedEvent(MathUtils.elapsedMicroSec(startNanos));
             complete(false);
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
index 6a5f7a7..6a647a9 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
@@ -343,9 +343,10 @@ public class FutureUtils {
      */
     public static Throwable zkException(Throwable throwable, String path) {
         if (throwable instanceof KeeperException) {
-            return throwable;
+            return new ZKException("Encountered zookeeper exception on " + 
path, (KeeperException) throwable);
         } else if (throwable instanceof 
ZooKeeperClient.ZooKeeperConnectionException) {
-            return KeeperException.create(KeeperException.Code.CONNECTIONLOSS, 
path);
+            return new ZKException("Encountered zookeeper connection loss on " 
+ path,
+                    KeeperException.Code.CONNECTIONLOSS);
         } else if (throwable instanceof InterruptedException) {
             return new DLInterruptedException("Interrupted on operating " + 
path, throwable);
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
index 57b576e..c588cd7 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
@@ -18,6 +18,7 @@
 package com.twitter.distributedlog;
 
 import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter;
+import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.metadata.BKDLConfig;
 import com.twitter.distributedlog.metadata.DLMetadata;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
@@ -185,7 +186,12 @@ public class DLMTestUtil {
         BKDistributedLogManager dlm = (BKDistributedLogManager) 
createNewDLM(name, conf, uri);
         try {
             BKLogReadHandler readHandler = dlm.createReadHandler();
-            List<LogSegmentMetadata> ledgerList = 
readHandler.getFullLedgerList(true, true);
+            List<LogSegmentMetadata> ledgerList = FutureUtils.result(
+                    readHandler.readLogSegmentsFromStore(
+                            LogSegmentMetadata.COMPARATOR,
+                            LogSegmentFilter.DEFAULT_FILTER,
+                            null)
+            ).getValue();
             LogSegmentMetadata lastSegment = ledgerList.get(ledgerList.size() 
- 1);
             BookKeeperClient bkc = dlm.getWriterBKC();
             LedgerHandle lh = bkc.get().openLedger(lastSegment.getLedgerId(),
@@ -415,10 +421,12 @@ public class DLMTestUtil {
                 conf.getAckQuorumSize(), BookKeeper.DigestType.CRC32, 
conf.getBKDigestPW().getBytes());
         String inprogressZnodeName = 
writeHandler.inprogressZNodeName(lh.getId(), startTxID, logSegmentSeqNo);
         String znodePath = writeHandler.inprogressZNode(lh.getId(), startTxID, 
logSegmentSeqNo);
+        int logSegmentMetadataVersion = 
conf.getDLLedgerMetadataLayoutVersion();
         LogSegmentMetadata l =
             new LogSegmentMetadata.LogSegmentMetadataBuilder(znodePath,
-                    conf.getDLLedgerMetadataLayoutVersion(), lh.getId(), 
startTxID)
+                    logSegmentMetadataVersion, lh.getId(), startTxID)
                 .setLogSegmentSequenceNo(logSegmentSeqNo)
+                
.setEnvelopeEntries(LogSegmentMetadata.supportsEnvelopedEntries(logSegmentMetadataVersion))
                 .build();
         l.write(dlm.writerZKC);
         writeHandler.maxTxId.store(startTxID);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java
index ae71a1e..fb69c8d 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java
@@ -65,7 +65,7 @@ class NonBlockingReadsTestUtil {
         try {
             LOG.info("Created reader reading from {}", dlm.getStreamName());
             if (forceStall) {
-                reader.disableReadAheadZKNotification();
+                reader.disableReadAheadLogSegmentsNotification();
             }
 
             long numTrans = 0;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
index 38aaa5b..0c7f346 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
@@ -86,6 +86,7 @@ public class TestAsyncReaderWriter extends 
TestDistributedLogBase {
         this.testConf = new DistributedLogConfiguration();
         this.testConf.loadConf(conf);
         this.testConf.setReaderIdleErrorThresholdMillis(1200000);
+        this.testConf.setReadAheadWaitTimeOnEndOfStream(20);
     }
 
     @Rule
@@ -816,7 +817,8 @@ public class TestAsyncReaderWriter extends 
TestDistributedLogBase {
     @Ignore
     @Test(timeout = 120000)
     public void testSimpleAsyncReadWriteStartEmptyFactory() throws Exception {
-        int count = 50;
+        // int count = 50;
+        int count = 1;
         String name = runtime.getMethodName();
         DistributedLogConfiguration confLocal = new 
DistributedLogConfiguration();
         confLocal.loadConf(testConf);
@@ -1484,9 +1486,8 @@ public class TestAsyncReaderWriter extends 
TestDistributedLogBase {
     }
 
     @DistributedLogAnnotations.FlakyTest
-    @Ignore
     @Test(timeout = 60000)
-    public void testAsyncReadMissingZKNotification() throws Exception {
+    public void testAsyncReadMissingLogSegmentsNotification() throws Exception 
{
         String name = "distrlog-async-reader-missing-zk-notification";
         DistributedLogConfiguration confLocal = new 
DistributedLogConfiguration();
         confLocal.loadConf(testConf);
@@ -1501,6 +1502,7 @@ public class TestAsyncReaderWriter extends 
TestDistributedLogBase {
         final int segmentSize = 10;
         final int numSegments = 3;
         final CountDownLatch latch = new CountDownLatch(1);
+        final CountDownLatch readLatch = new CountDownLatch(1);
         final ScheduledThreadPoolExecutor executor = new 
ScheduledThreadPoolExecutor(1);
         executor.schedule(
                 new Runnable() {
@@ -1515,6 +1517,9 @@ public class TestAsyncReaderWriter extends 
TestDistributedLogBase {
                                     
writer.write(DLMTestUtil.getLargeLogRecordInstance(txid++));
                                     if ((i == 0) && (j == 1)) {
                                         latch.countDown();
+                                    } else {
+                                        // wait for reader to start
+                                        readLatch.await();
                                     }
                                 }
                                 writer.closeAndComplete();
@@ -1530,13 +1535,16 @@ public class TestAsyncReaderWriter extends 
TestDistributedLogBase {
 
         latch.await();
         BKAsyncLogReaderDLSN reader = 
(BKAsyncLogReaderDLSN)dlm.getAsyncLogReader(DLSN.InitialDLSN);
-        reader.disableReadAheadZKNotification();
+        reader.disableReadAheadLogSegmentsNotification();
         boolean exceptionEncountered = false;
         int recordCount = 0;
         try {
             while (true) {
                 Future<LogRecordWithDLSN> record = reader.readNext();
                 Await.result(record);
+                if (recordCount == 0) {
+                    readLatch.countDown();
+                }
                 recordCount++;
 
                 if (recordCount >= segmentSize * numSegments) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
index c0789ec..6ad9950 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java
@@ -979,13 +979,18 @@ public class TestBKDistributedLogManager extends 
TestDistributedLogBase {
                     return;
                 }
                 if (updates >= 1) {
-                    if (segments.get(0).getLogSegmentSequenceNumber() != 
updates) {
+                    if (segments.get(segments.size() - 
1).getLogSegmentSequenceNumber() != updates) {
                         numFailures.incrementAndGet();
                     }
                 }
                 receivedStreams.set(segments);
                 latches[updates].countDown();
             }
+
+            @Override
+            public void onLogStreamDeleted() {
+                // no-op
+            }
         });
         LOG.info("Registered listener for stream {}.", name);
         long txid = 1;
@@ -1006,12 +1011,12 @@ public class TestBKDistributedLogManager extends 
TestDistributedLogBase {
         assertEquals(0, numFailures.get());
         assertNotNull(receivedStreams.get());
         assertEquals(numSegments, receivedStreams.get().size());
-        int seqno = numSegments;
+        int seqno = 1;
         for (LogSegmentMetadata m : receivedStreams.get()) {
             assertEquals(seqno, m.getLogSegmentSequenceNumber());
             assertEquals((seqno - 1) * DEFAULT_SEGMENT_SIZE + 1, 
m.getFirstTxId());
             assertEquals(seqno * DEFAULT_SEGMENT_SIZE, m.getLastTxId());
-            --seqno;
+            ++seqno;
         }
     }
 
@@ -1122,6 +1127,7 @@ public class TestBKDistributedLogManager extends 
TestDistributedLogBase {
         confLocal.loadConf(conf);
         
confLocal.setDLLedgerMetadataLayoutVersion(LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION);
         confLocal.setOutputBufferSize(0);
+        confLocal.setLogSegmentCacheEnabled(false);
 
         LogSegmentMetadataStore metadataStore = new 
ZKLogSegmentMetadataStore(confLocal, zookeeperClient, scheduler);
 
@@ -1174,7 +1180,8 @@ public class TestBKDistributedLogManager extends 
TestDistributedLogBase {
         {
             LogReader reader = dlm.getInputStream(DLSN.InitialDLSN);
             LogRecordWithDLSN record = reader.readNext(false);
-            assertTrue((record != null) && (record.getDlsn().compareTo(new 
DLSN(2, 0, 0)) == 0));
+            assertTrue("Unexpected record : " + record,
+                    (record != null) && (record.getDlsn().compareTo(new 
DLSN(2, 0, 0)) == 0));
             reader.close();
         }
 
@@ -1225,7 +1232,7 @@ public class TestBKDistributedLogManager extends 
TestDistributedLogBase {
         BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
         Assert.assertTrue(Await.result(writer.truncate(truncDLSN)));
         BKLogWriteHandler handler = writer.getCachedWriteHandler();
-        List<LogSegmentMetadata> cachedSegments = 
handler.getFullLedgerList(false, false);
+        List<LogSegmentMetadata> cachedSegments = 
handler.getCachedLogSegments(LogSegmentMetadata.COMPARATOR);
         for (LogSegmentMetadata segment: cachedSegments) {
             if (segment.getLastDLSN().compareTo(truncDLSN) < 0) {
                 Assert.assertTrue(segment.isTruncated());

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java
index 48c07ed..4b17500 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java
@@ -20,6 +20,7 @@ package com.twitter.distributedlog;
 import com.google.common.base.Optional;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
+import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.Utils;
 import com.twitter.util.Duration;
@@ -28,12 +29,9 @@ import com.twitter.util.Await;
 
 import java.util.List;
 import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import com.twitter.util.TimeoutException;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -89,76 +87,6 @@ public class TestBKLogReadHandler extends 
TestDistributedLogBase {
     }
 
     @Test(timeout = 60000)
-    public void testGetLedgerList() throws Exception {
-        String dlName = runtime.getMethodName();
-        prepareLogSegments(dlName, 3, 3);
-        BKDistributedLogManager dlm = createNewDLM(conf, dlName);
-        BKLogReadHandler readHandler = dlm.createReadHandler();
-        List<LogSegmentMetadata> ledgerList = readHandler.getLedgerList(false, 
false, LogSegmentMetadata.COMPARATOR, false);
-        List<LogSegmentMetadata> ledgerList2 = 
readHandler.getFilteredLedgerList(true, false);
-        List<LogSegmentMetadata> ledgerList3 = 
readHandler.getLedgerList(false, false, LogSegmentMetadata.COMPARATOR, false);
-        assertEquals(3, ledgerList.size());
-        assertEquals(3, ledgerList2.size());
-        assertEquals(3, ledgerList3.size());
-        for (int i=0; i<3; i++) {
-            assertEquals(ledgerList3.get(i), ledgerList2.get(i));
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testForceGetLedgerList() throws Exception {
-        String dlName = runtime.getMethodName();
-        prepareLogSegments(dlName, 3, 3);
-        BKDistributedLogManager dlm = createNewDLM(conf, dlName);
-        BKLogReadHandler readHandler = dlm.createReadHandler();
-        List<LogSegmentMetadata> ledgerList = readHandler.getLedgerList(true, 
false, LogSegmentMetadata.COMPARATOR, false);
-        final AtomicReference<List<LogSegmentMetadata>> resultHolder =
-                new AtomicReference<List<LogSegmentMetadata>>(null);
-        final CountDownLatch latch = new CountDownLatch(1);
-        readHandler.asyncGetLedgerList(LogSegmentMetadata.COMPARATOR, null, 
new BookkeeperInternalCallbacks.GenericCallback<List<LogSegmentMetadata>>() {
-            @Override
-            public void operationComplete(int rc, List<LogSegmentMetadata> 
result) {
-                resultHolder.set(result);
-                latch.countDown();
-            }
-        });
-        latch.await();
-        List<LogSegmentMetadata> newLedgerList = resultHolder.get();
-        assertNotNull(newLedgerList);
-        LOG.info("Force sync get list : {}", ledgerList);
-        LOG.info("Async get list : {}", newLedgerList);
-        assertEquals(3, ledgerList.size());
-        assertEquals(3, newLedgerList.size());
-        for (int i=0; i<3; i++) {
-            assertEquals(ledgerList.get(i), newLedgerList.get(i));
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testGetFilteredLedgerListInWriteHandler() throws Exception {
-        String dlName = runtime.getMethodName();
-        prepareLogSegments(dlName, 11, 3);
-        BKDistributedLogManager dlm = createNewDLM(conf, dlName);
-
-        // Get full list.
-        BKLogWriteHandler writeHandler0 = dlm.createWriteHandler(false);
-        List<LogSegmentMetadata> cachedFullLedgerList =
-                
writeHandler0.getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR);
-        assertTrue(cachedFullLedgerList.size() <= 1);
-        List<LogSegmentMetadata> fullLedgerList = 
writeHandler0.getFullLedgerListDesc(false, false);
-        assertEquals(11, fullLedgerList.size());
-
-        // Get filtered list.
-        BKLogWriteHandler writeHandler1 = dlm.createWriteHandler(false);
-        List<LogSegmentMetadata> filteredLedgerListDesc = 
writeHandler1.getFilteredLedgerListDesc(false, false);
-        assertEquals(1, filteredLedgerListDesc.size());
-        assertEquals(fullLedgerList.get(0), filteredLedgerListDesc.get(0));
-        List<LogSegmentMetadata> filteredLedgerList = 
writeHandler1.getFilteredLedgerList(false, false);
-        assertEquals(1, filteredLedgerList.size());
-        assertEquals(fullLedgerList.get(0), filteredLedgerList.get(0));
-    }
-
-    @Test(timeout = 60000)
     public void testGetFirstDLSNWithOpenLedger() throws Exception {
         String dlName = runtime.getMethodName();
 
@@ -362,7 +290,13 @@ public class TestBKLogReadHandler extends 
TestDistributedLogBase {
         Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, 
false)));
 
         BKLogReadHandler readHandler = bkdlm.createReadHandler();
-        List<LogSegmentMetadata> ledgerList = readHandler.getLedgerList(false, 
false, LogSegmentMetadata.COMPARATOR, false);
+        List<LogSegmentMetadata> ledgerList = FutureUtils.result(
+                readHandler.readLogSegmentsFromStore(
+                        LogSegmentMetadata.COMPARATOR,
+                        LogSegmentFilter.DEFAULT_FILTER,
+                        null
+                )
+        ).getValue();
         assertEquals(1, ledgerList.size());
         assertTrue(ledgerList.get(0).isInProgress());
 
@@ -386,7 +320,12 @@ public class TestBKLogReadHandler extends 
TestDistributedLogBase {
         Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, 
false)));
 
         BKLogReadHandler readHandler = bkdlm.createReadHandler();
-        List<LogSegmentMetadata> ledgerList = readHandler.getLedgerList(false, 
false, LogSegmentMetadata.COMPARATOR, false);
+        List<LogSegmentMetadata> ledgerList = FutureUtils.result(
+                readHandler.readLogSegmentsFromStore(
+                        LogSegmentMetadata.COMPARATOR,
+                        LogSegmentFilter.DEFAULT_FILTER,
+                        null)
+        ).getValue();
         assertEquals(2, ledgerList.size());
         assertFalse(ledgerList.get(0).isInProgress());
         assertTrue(ledgerList.get(1).isInProgress());

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAhead.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAhead.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAhead.java
index d28e2c6..71b6834 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAhead.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAhead.java
@@ -139,14 +139,6 @@ public class TestReadAhead extends TestDistributedLogBase {
         }
         Thread.sleep(1000);
 
-        // Expire the session, so the readahead should be awaken from backoff
-        
ZooKeeperClientUtils.expireSession(reader.bkLedgerManager.zooKeeperClient, 
zkServers, 1000);
-        AsyncNotification notification2;
-        do {
-            Thread.sleep(200);
-            notification2 = 
reader.bkLedgerManager.readAheadWorker.getMetadataNotification();
-        } while (null == notification2 || notification1 == notification2);
-
         // write another record
         BKSyncLogWriter writer =
                     (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java
index e014311..998c7ba 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
+import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.Utils;
 import com.twitter.util.Await;
@@ -91,7 +92,12 @@ public class TestReadUtils extends TestDistributedLogBase {
 
     private Future<LogRecordWithDLSN> 
getLastUserRecord(BKDistributedLogManager bkdlm, int ledgerNo) throws Exception 
{
         BKLogReadHandler readHandler = bkdlm.createReadHandler();
-        List<LogSegmentMetadata> ledgerList = readHandler.getLedgerList(false, 
false, LogSegmentMetadata.COMPARATOR, false);
+        List<LogSegmentMetadata> ledgerList = FutureUtils.result(
+                readHandler.readLogSegmentsFromStore(
+                        LogSegmentMetadata.COMPARATOR,
+                        LogSegmentFilter.DEFAULT_FILTER,
+                        null)
+        ).getValue();
         final LedgerHandleCache handleCache = LedgerHandleCache.newBuilder()
                 .bkc(bkdlm.getWriterBKC())
                 .conf(conf)

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java
index 555b1b0..a7dead4 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java
@@ -99,6 +99,7 @@ public class TestDLCK extends TestDistributedLogBase {
         confLocal.setImmediateFlushEnabled(true);
         confLocal.setOutputBufferSize(0);
         confLocal.setLogSegmentSequenceNumberValidationEnabled(false);
+        confLocal.setLogSegmentCacheEnabled(false);
         URI uri = createDLMURI("/check-and-repair-dl-namespace");
         zkc.get().create(uri.getPath(), new byte[0], 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         com.twitter.distributedlog.DistributedLogManagerFactory factory =

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java
index 6e6b83a..66d7228 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.TestZooKeeperClientBuilder;
 import com.twitter.distributedlog.annotations.DistributedLogAnnotations;
+import com.twitter.distributedlog.exceptions.UnexpectedException;
 import com.twitter.distributedlog.util.Utils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
@@ -46,7 +47,6 @@ import 
com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
 import com.twitter.util.Await;
 import com.twitter.util.Duration;
 import com.twitter.util.Future;
-import com.twitter.util.TimeoutException;
 
 import static org.junit.Assert.*;
 
@@ -81,11 +81,19 @@ public class TestDistributedLogAdmin extends 
TestDistributedLogBase {
         DistributedLogConfiguration confLocal = new 
DistributedLogConfiguration();
         confLocal.addConfiguration(conf);
         confLocal.setLogSegmentSequenceNumberValidationEnabled(false);
+        confLocal.setLogSegmentCacheEnabled(false);
+
+        DistributedLogConfiguration readConf = new 
DistributedLogConfiguration();
+        readConf.addConfiguration(conf);
+        readConf.setLogSegmentCacheEnabled(false);
+        readConf.setLogSegmentSequenceNumberValidationEnabled(true);
 
         URI uri = createDLMURI("/change-sequence-number");
         zooKeeperClient.get().create(uri.getPath(), new byte[0], 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         com.twitter.distributedlog.DistributedLogManagerFactory factory =
                 new 
com.twitter.distributedlog.DistributedLogManagerFactory(confLocal, uri);
+        com.twitter.distributedlog.DistributedLogManagerFactory readFactory =
+                new 
com.twitter.distributedlog.DistributedLogManagerFactory(readConf, uri);
 
         String streamName = "change-sequence-number";
 
@@ -96,61 +104,64 @@ public class TestDistributedLogAdmin extends 
TestDistributedLogBase {
         dlm.close();
 
         // create a reader
-        DistributedLogManager readDLM = 
factory.createDistributedLogManagerWithSharedClients(streamName);
+        DistributedLogManager readDLM = 
readFactory.createDistributedLogManagerWithSharedClients(streamName);
         AsyncLogReader reader = readDLM.getAsyncLogReader(DLSN.InitialDLSN);
 
         // read the records
         long expectedTxId = 1L;
+        DLSN lastDLSN = DLSN.InitialDLSN;
         for (int i = 0; i < 4 * 10; i++) {
-            LogRecord record = Await.result(reader.readNext());
+            LogRecordWithDLSN record = Await.result(reader.readNext());
             assertNotNull(record);
             DLMTestUtil.verifyLogRecord(record);
             assertEquals(expectedTxId, record.getTransactionId());
             expectedTxId++;
+            lastDLSN = record.getDlsn();
         }
 
+        LOG.info("Injecting bad log segment '3'");
+
         dlm = factory.createDistributedLogManagerWithSharedClients(streamName);
         DLMTestUtil.injectLogSegmentWithGivenLogSegmentSeqNo(dlm, confLocal, 
3L, 5 * 10 + 1, true, 10, false);
 
-        // Wait for reader to be aware of new log segments
-        TimeUnit.SECONDS.sleep(2);
+        LOG.info("Injected bad log segment '3'");
 
-        DLSN dlsn = readDLM.getLastDLSN();
-        assertTrue(dlsn.compareTo(new DLSN(5, Long.MIN_VALUE, Long.MIN_VALUE)) 
< 0);
-        assertTrue(dlsn.compareTo(new DLSN(4, -1, Long.MIN_VALUE)) > 0);
         // there isn't records should be read
         Future<LogRecordWithDLSN> readFuture = reader.readNext();
         try {
-            Await.result(readFuture, Duration.fromMilliseconds(1000));
-            fail("Should fail reading next when there is a corrupted log 
segment");
-        } catch (TimeoutException te) {
+            LogRecordWithDLSN record = Await.result(readFuture);
+            fail("Should fail reading next record "
+                    + record
+                    + " when there is a corrupted log segment");
+        } catch (UnexpectedException ue) {
             // expected
         }
 
+        LOG.info("Dryrun fix inprogress segment that has lower sequence 
number");
+
         // Dryrun
         
DistributedLogAdmin.fixInprogressSegmentWithLowerSequenceNumber(factory,
                 new DryrunLogSegmentMetadataStoreUpdater(confLocal, 
getLogSegmentMetadataStore(factory)), streamName, false, false);
 
-        // Wait for reader to be aware of new log segments
-        TimeUnit.SECONDS.sleep(2);
-
-        dlsn = readDLM.getLastDLSN();
-        assertTrue(dlsn.compareTo(new DLSN(5, Long.MIN_VALUE, Long.MIN_VALUE)) 
< 0);
-        assertTrue(dlsn.compareTo(new DLSN(4, -1, Long.MIN_VALUE)) > 0);
-        // there isn't records should be read
         try {
-            Await.result(readFuture, Duration.fromMilliseconds(1000));
+            reader = readDLM.getAsyncLogReader(lastDLSN);
+            Await.result(reader.readNext());
             fail("Should fail reading next when there is a corrupted log 
segment");
-        } catch (TimeoutException te) {
+        } catch (UnexpectedException ue) {
             // expected
         }
 
+        LOG.info("Actual run fix inprogress segment that has lower sequence 
number");
+
         // Actual run
         
DistributedLogAdmin.fixInprogressSegmentWithLowerSequenceNumber(factory,
                 
LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, 
getLogSegmentMetadataStore(factory)), streamName, false, false);
 
-        // Wait for reader to be aware of new log segments
-        TimeUnit.SECONDS.sleep(2);
+        // be able to read more after fix
+        reader = readDLM.getAsyncLogReader(lastDLSN);
+        // skip the first record
+        Await.result(reader.readNext());
+        readFuture = reader.readNext();
 
         expectedTxId = 51L;
         LogRecord record = Await.result(readFuture);
@@ -167,15 +178,11 @@ public class TestDistributedLogAdmin extends 
TestDistributedLogBase {
             expectedTxId++;
         }
 
-        dlsn = readDLM.getLastDLSN();
-        LOG.info("LastDLSN after fix inprogress segment : {}", dlsn);
-        assertTrue(dlsn.compareTo(new DLSN(7, Long.MIN_VALUE, Long.MIN_VALUE)) 
< 0);
-        assertTrue(dlsn.compareTo(new DLSN(6, -1, Long.MIN_VALUE)) > 0);
-
         Utils.close(reader);
         readDLM.close();
 
         dlm.close();
         factory.close();
+        readFactory.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
index f8fd3eb..d874274 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
@@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory;
 import java.net.URI;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -311,7 +312,8 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         Collections.sort(children);
         assertEquals("Should find 10 log segments",
                 10, children.size());
-        List<String> logSegmentNames = 
FutureUtils.result(lsmStore.getLogSegmentNames(rootPath));
+        List<String> logSegmentNames =
+                FutureUtils.result(lsmStore.getLogSegmentNames(rootPath, 
null)).getValue();
         Collections.sort(logSegmentNames);
         assertEquals("Should find 10 log segments",
                 10, logSegmentNames.size());
@@ -331,9 +333,13 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
     public void testRegisterListenerAfterLSMStoreClosed() throws Exception {
         lsmStore.close();
         LogSegmentMetadata segment = createLogSegment(1L);
-        lsmStore.registerLogSegmentListener(segment.getZkPath(), new 
LogSegmentNamesListener() {
+        lsmStore.getLogSegmentNames(segment.getZkPath(), new 
LogSegmentNamesListener() {
             @Override
-            public void onSegmentsUpdated(List<String> segments) {
+            public void onSegmentsUpdated(Versioned<List<String>> segments) {
+                // no-op;
+            }
+            @Override
+            public void onLogStreamDeleted() {
                 // no-op;
             }
         });
@@ -358,13 +364,17 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         final List<List<String>> segmentLists = 
Lists.newArrayListWithExpectedSize(2);
         LogSegmentNamesListener listener = new LogSegmentNamesListener() {
             @Override
-            public void onSegmentsUpdated(List<String> segments) {
+            public void onSegmentsUpdated(Versioned<List<String>> segments) {
                 logger.info("Received segments : {}", segments);
-                segmentLists.add(segments);
+                segmentLists.add(segments.getValue());
                 numNotifications.incrementAndGet();
             }
+            @Override
+            public void onLogStreamDeleted() {
+                // no-op;
+            }
         };
-        lsmStore.registerLogSegmentListener(rootPath, listener);
+        lsmStore.getLogSegmentNames(rootPath, listener);
         assertEquals(1, lsmStore.listeners.size());
         assertTrue("Should contain listener", 
lsmStore.listeners.containsKey(rootPath));
         assertTrue("Should contain listener", 
lsmStore.listeners.get(rootPath).containsKey(listener));
@@ -420,13 +430,18 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         final List<List<String>> segmentLists = 
Lists.newArrayListWithExpectedSize(2);
         LogSegmentNamesListener listener = new LogSegmentNamesListener() {
             @Override
-            public void onSegmentsUpdated(List<String> segments) {
+            public void onSegmentsUpdated(Versioned<List<String>> segments) {
                 logger.info("Received segments : {}", segments);
-                segmentLists.add(segments);
+                segmentLists.add(segments.getValue());
                 numNotifications.incrementAndGet();
             }
+
+            @Override
+            public void onLogStreamDeleted() {
+                // no-op;
+            }
         };
-        lsmStore.registerLogSegmentListener(rootPath, listener);
+        lsmStore.getLogSegmentNames(rootPath, listener);
         assertEquals(1, lsmStore.listeners.size());
         assertTrue("Should contain listener", 
lsmStore.listeners.containsKey(rootPath));
         assertTrue("Should contain listener", 
lsmStore.listeners.get(rootPath).containsKey(listener));
@@ -487,13 +502,18 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         final List<List<String>> segmentLists = 
Lists.newArrayListWithExpectedSize(2);
         LogSegmentNamesListener listener = new LogSegmentNamesListener() {
             @Override
-            public void onSegmentsUpdated(List<String> segments) {
+            public void onSegmentsUpdated(Versioned<List<String>> segments) {
                 logger.info("Received segments : {}", segments);
-                segmentLists.add(segments);
+                segmentLists.add(segments.getValue());
                 numNotifications.incrementAndGet();
             }
+
+            @Override
+            public void onLogStreamDeleted() {
+                // no-op;
+            }
         };
-        lsmStore.registerLogSegmentListener(rootPath, listener);
+        lsmStore.getLogSegmentNames(rootPath, listener);
         assertEquals(1, lsmStore.listeners.size());
         assertTrue("Should contain listener", 
lsmStore.listeners.containsKey(rootPath));
         assertTrue("Should contain listener", 
lsmStore.listeners.get(rootPath).containsKey(listener));
@@ -536,6 +556,80 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
     }
 
     @Test(timeout = 60000)
+    public void testLogSegmentNamesListenerOnDeletingLogStream() throws 
Exception {
+        int numSegments = 3;
+        Transaction<Object> createTxn = lsmStore.transaction();
+        for (int i = 0; i < numSegments; i++) {
+            LogSegmentMetadata segment = createLogSegment(i);
+            lsmStore.createLogSegment(createTxn, segment);
+        }
+        FutureUtils.result(createTxn.execute());
+        String rootPath = "/" + runtime.getMethodName();
+        List<String> children = zkc.get().getChildren(rootPath, false);
+        Collections.sort(children);
+
+        final AtomicInteger numNotifications = new AtomicInteger(0);
+        final List<List<String>> segmentLists = 
Lists.newArrayListWithExpectedSize(2);
+        final CountDownLatch deleteLatch = new CountDownLatch(1);
+        LogSegmentNamesListener listener = new LogSegmentNamesListener() {
+            @Override
+            public void onSegmentsUpdated(Versioned<List<String>> segments) {
+                logger.info("Received segments : {}", segments);
+                segmentLists.add(segments.getValue());
+                numNotifications.incrementAndGet();
+            }
+
+            @Override
+            public void onLogStreamDeleted() {
+                deleteLatch.countDown();
+            }
+        };
+        lsmStore.getLogSegmentNames(rootPath, listener);
+        assertEquals(1, lsmStore.listeners.size());
+        assertTrue("Should contain listener", 
lsmStore.listeners.containsKey(rootPath));
+        assertTrue("Should contain listener", 
lsmStore.listeners.get(rootPath).containsKey(listener));
+        while (numNotifications.get() < 1) {
+            TimeUnit.MILLISECONDS.sleep(10);
+        }
+        assertEquals("Should receive one segment list update",
+                1, numNotifications.get());
+        List<String> firstSegmentList = segmentLists.get(0);
+        Collections.sort(firstSegmentList);
+        assertEquals("List of segments should be same",
+                children, firstSegmentList);
+
+        // delete all log segments, it should trigger segment list updated
+        Transaction<Object> deleteTxn = lsmStore.transaction();
+        for (int i = 0; i < numSegments; i++) {
+            LogSegmentMetadata segment = createLogSegment(i);
+            lsmStore.deleteLogSegment(deleteTxn, segment);
+        }
+        FutureUtils.result(deleteTxn.execute());
+        List<String> newChildren = zkc.get().getChildren(rootPath, false);
+        Collections.sort(newChildren);
+        while (numNotifications.get() < 2) {
+            TimeUnit.MILLISECONDS.sleep(10);
+        }
+        assertEquals("Should receive second segment list update",
+                2, numNotifications.get());
+        List<String> secondSegmentList = segmentLists.get(1);
+        Collections.sort(secondSegmentList);
+        assertEquals("List of segments should be updated",
+                0, secondSegmentList.size());
+        assertEquals("List of segments should be updated",
+                newChildren, secondSegmentList);
+
+        // delete the root path
+        zkc.get().delete(rootPath, -1);
+        while (!lsmStore.listeners.isEmpty()) {
+            TimeUnit.MILLISECONDS.sleep(10);
+        }
+        assertTrue("listener should be removed after root path is deleted",
+                lsmStore.listeners.isEmpty());
+        deleteLatch.await();
+    }
+
+    @Test(timeout = 60000)
     public void testStoreMaxLogSegmentSequenceNumber() throws Exception {
         Transaction<Object> updateTxn = lsmStore.transaction();
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0));

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/logsegment/TestLogSegmentCache.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/logsegment/TestLogSegmentCache.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/logsegment/TestLogSegmentCache.java
deleted file mode 100644
index 3282f5c..0000000
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/logsegment/TestLogSegmentCache.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.logsegment;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.twitter.distributedlog.DLMTestUtil;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import org.apache.commons.lang3.tuple.Pair;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-
-/**
- * Test Case for Log Segment Cache.
- */
-public class TestLogSegmentCache {
-
-    @Test(timeout = 60000)
-    public void testBasicOperations() {
-        LogSegmentMetadata metadata =
-                DLMTestUtil.completedLogSegment("/segment1", 1L, 1L, 100L, 
100, 1L, 99L, 0L);
-        String name = 
DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L);
-
-        LogSegmentCache cache = new LogSegmentCache("test-basic-operations");
-        assertNull("No log segment " + name + " should be cached", 
cache.get(name));
-        cache.add(name, metadata);
-        LogSegmentMetadata metadataRetrieved = cache.get(name);
-        assertNotNull("log segment " + name + " should be cached", 
metadataRetrieved);
-        assertEquals("Wrong log segment metadata returned for " + name,
-                metadata, metadataRetrieved);
-        LogSegmentMetadata metadataRemoved = cache.remove(name);
-        assertNull("log segment " + name + " should be removed from cache", 
cache.get(name));
-        assertEquals("Wrong log segment metadata removed for " + name,
-                metadata, metadataRemoved);
-        assertNull("No log segment " + name + " to be removed", 
cache.remove(name));
-    }
-
-    @Test(timeout = 60000)
-    public void testDiff() {
-        LogSegmentCache cache = new LogSegmentCache("test-diff");
-        // add 5 completed log segments
-        for (int i = 1; i <= 5; i++) {
-            LogSegmentMetadata metadata =
-                    DLMTestUtil.completedLogSegment("/segment" + i, i, i, i * 
100L, 100, i, 99L, 0L);
-            String name = 
DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i);
-            cache.add(name, metadata);
-        }
-        // add one inprogress log segment
-        LogSegmentMetadata inprogress =
-                DLMTestUtil.inprogressLogSegment("/inprogress-6", 6, 600L, 6);
-        String name = DLMTestUtil.inprogressZNodeName(6);
-        cache.add(name, inprogress);
-
-        // deleted first 2 completed log segments and completed the last one
-        Set<String> segmentRemoved = Sets.newHashSet();
-        for (int i = 1; i <= 2; i++) {
-            
segmentRemoved.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i));
-        }
-        segmentRemoved.add((DLMTestUtil.inprogressZNodeName(6)));
-        Set<String> segmentReceived = Sets.newHashSet();
-        Set<String> segmentAdded = Sets.newHashSet();
-        for (int i = 3; i <= 6; i++) {
-            
segmentReceived.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i));
-            if (i == 6) {
-                
segmentAdded.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i));
-            }
-        }
-
-        Pair<Set<String>, Set<String>> segmentChanges = 
cache.diff(segmentReceived);
-        assertTrue("Should remove " + segmentRemoved + ", but removed " + 
segmentChanges.getRight(),
-                Sets.difference(segmentRemoved, 
segmentChanges.getRight()).isEmpty());
-        assertTrue("Should add " + segmentAdded + ", but added " + 
segmentChanges.getLeft(),
-                Sets.difference(segmentAdded, 
segmentChanges.getLeft()).isEmpty());
-    }
-
-    @Test(timeout = 60000)
-    public void testUpdate() {
-        LogSegmentCache cache = new LogSegmentCache("test-update");
-        // add 5 completed log segments
-        for (int i = 1; i <= 5; i++) {
-            LogSegmentMetadata metadata =
-                    DLMTestUtil.completedLogSegment("/segment" + i, i, i, i * 
100L, 100, i, 99L, 0L);
-            String name = 
DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i);
-            cache.add(name, metadata);
-        }
-        // add one inprogress log segment
-        LogSegmentMetadata inprogress =
-                DLMTestUtil.inprogressLogSegment("/inprogress-6", 6, 600L, 6);
-        String name = DLMTestUtil.inprogressZNodeName(6);
-        cache.add(name, inprogress);
-
-        // deleted first 2 completed log segments and completed the last one
-        Set<String> segmentRemoved = Sets.newHashSet();
-        for (int i = 1; i <= 2; i++) {
-            
segmentRemoved.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i));
-        }
-        segmentRemoved.add((DLMTestUtil.inprogressZNodeName(6)));
-        Set<String> segmentReceived = Sets.newHashSet();
-        Map<String, LogSegmentMetadata> segmentAdded = Maps.newHashMap();
-        for (int i = 3; i <= 6; i++) {
-            
segmentReceived.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i));
-            if (i == 6) {
-                
segmentAdded.put(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i),
-                        DLMTestUtil.completedLogSegment("/segment" + i, i, i, 
i * 100L, 100, i, 99L, 0L));
-            }
-        }
-
-        // update the cache
-        cache.update(segmentRemoved, segmentAdded);
-        for (String segment : segmentRemoved) {
-            assertNull("Segment " + segment + " should be removed.", 
cache.get(segment));
-        }
-        for (String segment : segmentReceived) {
-            assertNotNull("Segment " + segment + " should not be removed", 
cache.get(segment));
-        }
-        for (Map.Entry<String, LogSegmentMetadata> entry : 
segmentAdded.entrySet()) {
-            assertEquals("Segment " + entry.getKey() + " should be added.",
-                    entry.getValue(), entry.getValue());
-        }
-    }
-
-    @Test(timeout = 60000, expected = UnexpectedException.class)
-    public void testGapDetection() throws Exception {
-        LogSegmentCache cache = new LogSegmentCache("test-gap-detection");
-        
cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L),
-                DLMTestUtil.completedLogSegment("/segment-1", 1L, 1L, 100L, 
100, 1L, 99L, 0L));
-        
cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(3L),
-                DLMTestUtil.completedLogSegment("/segment-3", 3L, 3L, 300L, 
100, 3L, 99L, 0L));
-        cache.getLogSegments(LogSegmentMetadata.COMPARATOR);
-    }
-
-    @Test(timeout = 60000)
-    public void testGapDetectionOnLogSegmentsWithoutLogSegmentSequenceNumber() 
throws Exception {
-        LogSegmentCache cache = new LogSegmentCache("test-gap-detection");
-        LogSegmentMetadata segment1 =
-                DLMTestUtil.completedLogSegment("/segment-1", 1L, 1L, 100L, 
100, 1L, 99L, 0L)
-                        
.mutator().setVersion(LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V1_ORIGINAL).build();
-        
cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L), 
segment1);
-        LogSegmentMetadata segment3 =
-                DLMTestUtil.completedLogSegment("/segment-3", 3L, 3L, 300L, 
100, 3L, 99L, 0L)
-                        
.mutator().setVersion(LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO).build();
-        
cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(3L), 
segment3);
-        List<LogSegmentMetadata> expectedList = Lists.asList(segment1, new 
LogSegmentMetadata[] { segment3 });
-        List<LogSegmentMetadata> resultList = 
cache.getLogSegments(LogSegmentMetadata.COMPARATOR);
-        assertEquals(expectedList, resultList);
-    }
-
-    @Test(timeout = 60000)
-    public void testSameLogSegment() throws Exception {
-        LogSegmentCache cache = new LogSegmentCache("test-same-log-segment");
-        List<LogSegmentMetadata> expectedList = 
Lists.newArrayListWithExpectedSize(2);
-        LogSegmentMetadata inprogress =
-                DLMTestUtil.inprogressLogSegment("/inprogress-1", 1L, 1L, 1L);
-        expectedList.add(inprogress);
-        cache.add(DLMTestUtil.inprogressZNodeName(1L), inprogress);
-        LogSegmentMetadata completed =
-                DLMTestUtil.completedLogSegment("/segment-1", 1L, 1L, 100L, 
100, 1L, 99L, 0L);
-        expectedList.add(completed);
-        
cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L), 
completed);
-
-        List<LogSegmentMetadata> retrievedList = 
cache.getLogSegments(LogSegmentMetadata.COMPARATOR);
-        assertEquals("Should get both log segments in ascending order",
-                expectedList.size(), retrievedList.size());
-        for (int i = 0; i < expectedList.size(); i++) {
-            assertEqualsWithoutSequenceId(expectedList.get(i), 
retrievedList.get(i));
-        }
-        assertEquals("inprogress log segment should see start sequence id : 0",
-                0L, retrievedList.get(0).getStartSequenceId());
-        Collections.reverse(expectedList);
-        retrievedList = 
cache.getLogSegments(LogSegmentMetadata.DESC_COMPARATOR);
-        assertEquals("Should get both log segments in descending order",
-                expectedList.size(), retrievedList.size());
-        for (int i = 0; i < expectedList.size(); i++) {
-            assertEqualsWithoutSequenceId(expectedList.get(i), 
retrievedList.get(i));
-        }
-        assertEquals("inprogress log segment should see start sequence id : 0",
-                0L, retrievedList.get(1).getStartSequenceId());
-    }
-
-    private static void assertEqualsWithoutSequenceId(LogSegmentMetadata m1, 
LogSegmentMetadata m2) {
-        assertEquals("expected " + m1 + " but got " + m2,
-                m1.getLogSegmentSequenceNumber(), 
m2.getLogSegmentSequenceNumber());
-        assertEquals("expected " + m1 + " but got " + m2,
-                m1.getLedgerId(), m2.getLedgerId());
-        assertEquals("expected " + m1 + " but got " + m2,
-                m1.getFirstTxId(), m2.getFirstTxId());
-        assertEquals("expected " + m1 + " but got " + m2,
-                m1.getLastTxId(), m2.getLastTxId());
-        assertEquals("expected " + m1 + " but got " + m2,
-                m1.getLastDLSN(), m2.getLastDLSN());
-        assertEquals("expected " + m1 + " but got " + m2,
-                m1.getRecordCount(), m2.getRecordCount());
-        assertEquals("expected " + m1 + " but got " + m2,
-                m1.isInProgress(), m2.isInProgress());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/logsegment/TestPerStreamLogSegmentCache.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/logsegment/TestPerStreamLogSegmentCache.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/logsegment/TestPerStreamLogSegmentCache.java
new file mode 100644
index 0000000..456ed68
--- /dev/null
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/logsegment/TestPerStreamLogSegmentCache.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.logsegment;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.twitter.distributedlog.DLMTestUtil;
+import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.distributedlog.exceptions.UnexpectedException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test Case for Per Stream Log Segment Cache.
+ */
+public class TestPerStreamLogSegmentCache {
+
+    @Test(timeout = 60000)
+    public void testBasicOperations() {
+        LogSegmentMetadata metadata =
+                DLMTestUtil.completedLogSegment("/segment1", 1L, 1L, 100L, 
100, 1L, 99L, 0L);
+        String name = 
DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L);
+
+        PerStreamLogSegmentCache cache = new 
PerStreamLogSegmentCache("test-basic-operations");
+        assertNull("No log segment " + name + " should be cached", 
cache.get(name));
+        cache.add(name, metadata);
+        LogSegmentMetadata metadataRetrieved = cache.get(name);
+        assertNotNull("log segment " + name + " should be cached", 
metadataRetrieved);
+        assertEquals("Wrong log segment metadata returned for " + name,
+                metadata, metadataRetrieved);
+        LogSegmentMetadata metadataRemoved = cache.remove(name);
+        assertNull("log segment " + name + " should be removed from cache", 
cache.get(name));
+        assertEquals("Wrong log segment metadata removed for " + name,
+                metadata, metadataRemoved);
+        assertNull("No log segment " + name + " to be removed", 
cache.remove(name));
+    }
+
+    @Test(timeout = 60000)
+    public void testDiff() {
+        PerStreamLogSegmentCache cache = new 
PerStreamLogSegmentCache("test-diff");
+        // add 5 completed log segments
+        for (int i = 1; i <= 5; i++) {
+            LogSegmentMetadata metadata =
+                    DLMTestUtil.completedLogSegment("/segment" + i, i, i, i * 
100L, 100, i, 99L, 0L);
+            String name = 
DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i);
+            cache.add(name, metadata);
+        }
+        // add one inprogress log segment
+        LogSegmentMetadata inprogress =
+                DLMTestUtil.inprogressLogSegment("/inprogress-6", 6, 600L, 6);
+        String name = DLMTestUtil.inprogressZNodeName(6);
+        cache.add(name, inprogress);
+
+        // deleted first 2 completed log segments and completed the last one
+        Set<String> segmentRemoved = Sets.newHashSet();
+        for (int i = 1; i <= 2; i++) {
+            
segmentRemoved.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i));
+        }
+        segmentRemoved.add((DLMTestUtil.inprogressZNodeName(6)));
+        Set<String> segmentReceived = Sets.newHashSet();
+        Set<String> segmentAdded = Sets.newHashSet();
+        for (int i = 3; i <= 6; i++) {
+            
segmentReceived.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i));
+            if (i == 6) {
+                
segmentAdded.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i));
+            }
+        }
+
+        Pair<Set<String>, Set<String>> segmentChanges = 
cache.diff(segmentReceived);
+        assertTrue("Should remove " + segmentRemoved + ", but removed " + 
segmentChanges.getRight(),
+                Sets.difference(segmentRemoved, 
segmentChanges.getRight()).isEmpty());
+        assertTrue("Should add " + segmentAdded + ", but added " + 
segmentChanges.getLeft(),
+                Sets.difference(segmentAdded, 
segmentChanges.getLeft()).isEmpty());
+    }
+
+    @Test(timeout = 60000)
+    public void testUpdate() {
+        PerStreamLogSegmentCache cache = new 
PerStreamLogSegmentCache("test-update");
+        // add 5 completed log segments
+        for (int i = 1; i <= 5; i++) {
+            LogSegmentMetadata metadata =
+                    DLMTestUtil.completedLogSegment("/segment" + i, i, i, i * 
100L, 100, i, 99L, 0L);
+            String name = 
DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i);
+            cache.add(name, metadata);
+        }
+        // add one inprogress log segment
+        LogSegmentMetadata inprogress =
+                DLMTestUtil.inprogressLogSegment("/inprogress-6", 6, 600L, 6);
+        String name = DLMTestUtil.inprogressZNodeName(6);
+        cache.add(name, inprogress);
+
+        // deleted first 2 completed log segments and completed the last one
+        Set<String> segmentRemoved = Sets.newHashSet();
+        for (int i = 1; i <= 2; i++) {
+            
segmentRemoved.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i));
+        }
+        segmentRemoved.add((DLMTestUtil.inprogressZNodeName(6)));
+        Set<String> segmentReceived = Sets.newHashSet();
+        Map<String, LogSegmentMetadata> segmentAdded = Maps.newHashMap();
+        for (int i = 3; i <= 6; i++) {
+            
segmentReceived.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i));
+            if (i == 6) {
+                
segmentAdded.put(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i),
+                        DLMTestUtil.completedLogSegment("/segment" + i, i, i, 
i * 100L, 100, i, 99L, 0L));
+            }
+        }
+
+        // update the cache
+        cache.update(segmentRemoved, segmentAdded);
+        for (String segment : segmentRemoved) {
+            assertNull("Segment " + segment + " should be removed.", 
cache.get(segment));
+        }
+        for (String segment : segmentReceived) {
+            assertNotNull("Segment " + segment + " should not be removed", 
cache.get(segment));
+        }
+        for (Map.Entry<String, LogSegmentMetadata> entry : 
segmentAdded.entrySet()) {
+            assertEquals("Segment " + entry.getKey() + " should be added.",
+                    entry.getValue(), entry.getValue());
+        }
+    }
+
+    @Test(timeout = 60000, expected = UnexpectedException.class)
+    public void testGapDetection() throws Exception {
+        PerStreamLogSegmentCache cache = new 
PerStreamLogSegmentCache("test-gap-detection");
+        
cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L),
+                DLMTestUtil.completedLogSegment("/segment-1", 1L, 1L, 100L, 
100, 1L, 99L, 0L));
+        
cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(3L),
+                DLMTestUtil.completedLogSegment("/segment-3", 3L, 3L, 300L, 
100, 3L, 99L, 0L));
+        cache.getLogSegments(LogSegmentMetadata.COMPARATOR);
+    }
+
+    @Test(timeout = 60000)
+    public void testGapDetectionOnLogSegmentsWithoutLogSegmentSequenceNumber() 
throws Exception {
+        PerStreamLogSegmentCache cache = new 
PerStreamLogSegmentCache("test-gap-detection");
+        LogSegmentMetadata segment1 =
+                DLMTestUtil.completedLogSegment("/segment-1", 1L, 1L, 100L, 
100, 1L, 99L, 0L)
+                        
.mutator().setVersion(LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V1_ORIGINAL).build();
+        
cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L), 
segment1);
+        LogSegmentMetadata segment3 =
+                DLMTestUtil.completedLogSegment("/segment-3", 3L, 3L, 300L, 
100, 3L, 99L, 0L)
+                        
.mutator().setVersion(LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO).build();
+        
cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(3L), 
segment3);
+        List<LogSegmentMetadata> expectedList = Lists.asList(segment1, new 
LogSegmentMetadata[] { segment3 });
+        List<LogSegmentMetadata> resultList = 
cache.getLogSegments(LogSegmentMetadata.COMPARATOR);
+        assertEquals(expectedList, resultList);
+    }
+
+    @Test(timeout = 60000, expected = UnexpectedException.class)
+    public void testSameLogSegment() throws Exception {
+        PerStreamLogSegmentCache cache = new 
PerStreamLogSegmentCache("test-same-log-segment");
+        List<LogSegmentMetadata> expectedList = 
Lists.newArrayListWithExpectedSize(2);
+        LogSegmentMetadata inprogress =
+                DLMTestUtil.inprogressLogSegment("/inprogress-1", 1L, 1L, 1L);
+        expectedList.add(inprogress);
+        cache.add(DLMTestUtil.inprogressZNodeName(1L), inprogress);
+        LogSegmentMetadata completed =
+                DLMTestUtil.completedLogSegment("/segment-1", 1L, 1L, 100L, 
100, 1L, 99L, 0L);
+        expectedList.add(completed);
+        
cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L), 
completed);
+
+        cache.getLogSegments(LogSegmentMetadata.COMPARATOR);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/tools/TestDistributedLogTool.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/tools/TestDistributedLogTool.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/tools/TestDistributedLogTool.java
index f9169c6..cbabf2a 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/tools/TestDistributedLogTool.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/tools/TestDistributedLogTool.java
@@ -21,6 +21,7 @@ import java.net.URI;
 
 import com.twitter.distributedlog.DLMTestUtil;
 import com.twitter.distributedlog.DLSN;
+import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.DistributedLogManager;
 import com.twitter.distributedlog.TestDistributedLogBase;
 import com.twitter.distributedlog.LocalDLMEmulator;
@@ -205,8 +206,11 @@ public class TestDistributedLogTool extends 
TestDistributedLogBase {
 
     @Test(timeout = 60000)
     public void testToolTruncateStream() throws Exception {
-        DistributedLogManager dlm = 
DLMTestUtil.createNewDLM("testToolTruncateStream", conf, defaultUri);
-        DLMTestUtil.generateCompletedLogSegments(dlm, conf, 3, 1000);
+        DistributedLogConfiguration confLocal = new 
DistributedLogConfiguration();
+        confLocal.addConfiguration(conf);
+        confLocal.setLogSegmentCacheEnabled(false);
+        DistributedLogManager dlm = 
DLMTestUtil.createNewDLM("testToolTruncateStream", confLocal, defaultUri);
+        DLMTestUtil.generateCompletedLogSegments(dlm, confLocal, 3, 1000);
 
         DLSN dlsn = new DLSN(2,1,0);
         TruncateStreamCommand cmd = new TruncateStreamCommand();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/LogSegmentNotFoundException.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/LogSegmentNotFoundException.java
 
b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/LogSegmentNotFoundException.java
new file mode 100644
index 0000000..292d135
--- /dev/null
+++ 
b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/LogSegmentNotFoundException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.exceptions;
+
+import com.twitter.distributedlog.thrift.service.StatusCode;
+
+/**
+ * Exception on log segment not found.
+ */
+public class LogSegmentNotFoundException extends DLException {
+
+    private static final long serialVersionUID = -2482324226595903864L;
+
+    public LogSegmentNotFoundException(String logSegmentPath) {
+        super(StatusCode.LOG_SEGMENT_NOT_FOUND, "Log Segment " + 
logSegmentPath + " not found");
+    }
+}


Reply via email to