DL-101: Improve session expire handling on fetching log segments for readers
This change focuses on improving the session expire handling on fetching log
segments for readers.
The log segment management in DL is now done by 3 parts.
- a LogSegmentMetadataStore (one per namespace instance): it is used for
fetching the log segments from log segment metadata store (ZooKeeper). it
doesn't do any caching.
- a LogSegmentMetadataCache (one per namespace instance): it is a guava cache
based metadata cache. it maintains a mapping between log segment metadata path
and the log segment metadata. it manages the cache for the log segments that
will be accessed in this namespace instance. it doesn't manage the sequence of
the log segments for streams.
- a PerStreamLogSegmentCache for each BKLogHandler. the log segment cache is
per stream. it maintains the sequence of the log segments.
BKLogWriteHandler doesn't watch the log segment changes. It fetches minimal
number of log segments when it is created and fetches the full list of log
segments for truncations. New log segments will be added to the per stream log
segment cache with log segment rolling.
BKLogReadHandler watch the log segments changes and only notify when the list
of log segments is changed. the session handling which is specific to the
metadata store is hidden to the implementations of LogSegmentMetadataStore.
The change tries to cleanup bunch of unused methods in BKLog{Read,Write}Handler
too.
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit:
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/9ee7d016
Tree:
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/9ee7d016
Diff:
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/9ee7d016
Branch: refs/heads/master
Commit: 9ee7d016072f45060d14af3e5e6ddd75a92285f6
Parents: 3ea8853
Author: Sijie Guo <[email protected]>
Authored: Thu Jul 28 21:10:33 2016 -0700
Committer: Sijie Guo <[email protected]>
Committed: Tue Dec 27 16:49:26 2016 -0800
----------------------------------------------------------------------
.../distributedlog/AsyncNotification.java | 4 +-
.../distributedlog/BKAsyncLogReaderDLSN.java | 29 +-
.../distributedlog/BKDistributedLogManager.java | 27 +-
.../BKDistributedLogNamespace.java | 5 +
.../twitter/distributedlog/BKLogHandler.java | 827 +++++--------------
.../distributedlog/BKLogReadHandler.java | 253 +++++-
.../distributedlog/BKLogWriteHandler.java | 194 +++--
.../distributedlog/BKSyncLogReaderDLSN.java | 4 +-
.../DistributedLogConfiguration.java | 71 ++
.../distributedlog/LogSegmentMetadata.java | 12 +-
.../callback/LogSegmentListener.java | 7 +-
.../callback/LogSegmentNamesListener.java | 12 +-
.../function/CloseAsyncCloseableFunction.java | 51 ++
.../function/GetVersionedValueFunction.java | 39 +
.../impl/ZKLogSegmentMetadataStore.java | 158 ++--
.../logsegment/LogSegmentCache.java | 228 -----
.../logsegment/LogSegmentMetadataCache.java | 98 +++
.../logsegment/LogSegmentMetadataStore.java | 17 +-
.../logsegment/PerStreamLogSegmentCache.java | 243 ++++++
.../readahead/ReadAheadWorker.java | 171 ++--
.../distributedlog/util/FutureUtils.java | 5 +-
.../com/twitter/distributedlog/DLMTestUtil.java | 12 +-
.../NonBlockingReadsTestUtil.java | 2 +-
.../distributedlog/TestAsyncReaderWriter.java | 16 +-
.../TestBKDistributedLogManager.java | 17 +-
.../distributedlog/TestBKLogReadHandler.java | 89 +-
.../twitter/distributedlog/TestReadAhead.java | 8 -
.../twitter/distributedlog/TestReadUtils.java | 8 +-
.../twitter/distributedlog/admin/TestDLCK.java | 1 +
.../admin/TestDistributedLogAdmin.java | 61 +-
.../impl/TestZKLogSegmentMetadataStore.java | 118 ++-
.../logsegment/TestLogSegmentCache.java | 220 -----
.../TestPerStreamLogSegmentCache.java | 186 +++++
.../tools/TestDistributedLogTool.java | 8 +-
.../exceptions/LogSegmentNotFoundException.java | 32 +
.../src/main/thrift/service.thrift | 2 +
.../distributedlog/service/MonitorService.java | 5 +
37 files changed, 1762 insertions(+), 1478 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java
----------------------------------------------------------------------
diff --git
a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java
index e7cb601..bd71147 100644
---
a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java
+++
b/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java
@@ -20,8 +20,10 @@ package com.twitter.distributedlog;
public interface AsyncNotification {
/**
* Triggered when the background activity encounters an exception
+ *
+ * @param reason the exception that encountered.
*/
- void notifyOnError();
+ void notifyOnError(Throwable reason);
/**
* Triggered when the background activity completes an operation
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
index b1a9273..90230ae 100644
---
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
+++
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
@@ -104,7 +104,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader,
Runnable, AsyncNotificatio
private boolean lockStream = false;
- private boolean disableReadAheadZKNotification = false;
+ private boolean disableReadAheadLogSegmentsNotification = false;
private final boolean returnEndOfStreamRecord;
@@ -400,23 +400,17 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader,
Runnable, AsyncNotificatio
bkLedgerManager.startReadAhead(
new LedgerReadPosition(getStartDLSN()),
failureInjector);
- if (disableReadAheadZKNotification) {
- bkLedgerManager.disableReadAheadZKNotification();
+ if (disableReadAheadLogSegmentsNotification) {
+
bkLedgerManager.disableReadAheadLogSegmentsNotification();
}
} catch (Exception exc) {
- setLastException(new IOException(exc));
- notifyOnError();
+ notifyOnError(exc);
}
}
@Override
public void onFailure(Throwable cause) {
- if (cause instanceof IOException) {
- setLastException((IOException)cause);
- } else {
- setLastException(new IOException(cause));
- }
- notifyOnError();
+ notifyOnError(cause);
}
});
readAheadStarted = true;
@@ -643,7 +637,12 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader,
Runnable, AsyncNotificatio
* Triggered when the background activity encounters an exception
*/
@Override
- public void notifyOnError() {
+ public void notifyOnError(Throwable cause) {
+ if (cause instanceof IOException) {
+ setLastException((IOException) cause);
+ } else {
+ setLastException(new IOException(cause));
+ }
scheduleBackgroundRead();
}
@@ -661,9 +660,9 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader,
Runnable, AsyncNotificatio
}
@VisibleForTesting
- synchronized void disableReadAheadZKNotification() {
- disableReadAheadZKNotification = true;
- bkLedgerManager.disableReadAheadZKNotification();
+ synchronized void disableReadAheadLogSegmentsNotification() {
+ disableReadAheadLogSegmentsNotification = true;
+ bkLedgerManager.disableReadAheadLogSegmentsNotification();
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
index 6a9d860..75a5b83 100644
---
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
+++
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
@@ -20,6 +20,7 @@ package com.twitter.distributedlog;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
import com.twitter.distributedlog.bk.DynamicQuorumConfigProvider;
import com.twitter.distributedlog.bk.LedgerAllocator;
import com.twitter.distributedlog.bk.LedgerAllocatorDelegator;
@@ -32,6 +33,8 @@ import
com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.exceptions.LogEmptyException;
import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
+import com.twitter.distributedlog.function.CloseAsyncCloseableFunction;
+import com.twitter.distributedlog.function.GetVersionedValueFunction;
import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
@@ -41,6 +44,8 @@ import com.twitter.distributedlog.lock.NopDistributedLock;
import com.twitter.distributedlog.lock.SessionLockFactory;
import com.twitter.distributedlog.lock.ZKDistributedLock;
import com.twitter.distributedlog.lock.ZKSessionLockFactory;
+import com.twitter.distributedlog.logsegment.LogSegmentFilter;
+import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
import com.twitter.distributedlog.metadata.BKDLConfig;
import com.twitter.distributedlog.stats.BroadCastStatsLogger;
@@ -159,6 +164,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor
implements DistributedL
// log segment metadata stores
private final LogSegmentMetadataStore writerMetadataStore;
private final LogSegmentMetadataStore readerMetadataStore;
+ private final LogSegmentMetadataCache logSegmentMetadataCache;
// bookkeeper clients
// NOTE: The actual bookkeeper client is initialized lazily when it is
referenced by
@@ -232,6 +238,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor
implements DistributedL
null,
null,
null,
+ new LogSegmentMetadataCache(conf, Ticker.systemTicker()),
OrderedScheduler.newBuilder().name("BKDL-" +
name).corePoolSize(1).build(),
null,
null,
@@ -293,6 +300,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor
implements DistributedL
SessionLockFactory lockFactory,
LogSegmentMetadataStore writerMetadataStore,
LogSegmentMetadataStore readerMetadataStore,
+ LogSegmentMetadataCache logSegmentMetadataCache,
OrderedScheduler scheduler,
OrderedScheduler readAheadScheduler,
OrderedScheduler lockStateExecutor,
@@ -336,6 +344,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor
implements DistributedL
} else {
this.readerMetadataStore = readerMetadataStore;
}
+ this.logSegmentMetadataCache = logSegmentMetadataCache;
// create the bkc for writers
if (null == writerBKCBuilder) {
@@ -451,7 +460,8 @@ class BKDistributedLogManager extends ZKMetadataAccessor
implements DistributedL
private synchronized BKLogReadHandler getReadHandlerForListener(boolean
create) {
if (null == readHandlerForListener && create) {
readHandlerForListener = createReadHandler();
- readHandlerForListener.scheduleGetLedgersTask(true, true);
+ // start fetch the log segments
+ readHandlerForListener.asyncStartFetchLogSegments();
}
return readHandlerForListener;
}
@@ -463,13 +473,12 @@ class BKDistributedLogManager extends ZKMetadataAccessor
implements DistributedL
protected Future<List<LogSegmentMetadata>> getLogSegmentsAsync() {
final BKLogReadHandler readHandler = createReadHandler();
- return readHandler.asyncGetFullLedgerList(true, false).ensure(new
AbstractFunction0<BoxedUnit>() {
- @Override
- public BoxedUnit apply() {
- readHandler.asyncClose();
- return BoxedUnit.UNIT;
- }
- });
+ return readHandler.readLogSegmentsFromStore(
+ LogSegmentMetadata.COMPARATOR,
+ LogSegmentFilter.DEFAULT_FILTER,
+ null)
+ .map(GetVersionedValueFunction.GET_LOGSEGMENT_LIST_FUNC)
+ .ensure(CloseAsyncCloseableFunction.of(readHandler));
}
@Override
@@ -534,6 +543,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor
implements DistributedL
readerZKCBuilder,
readerBKCBuilder,
readerMetadataStore,
+ logSegmentMetadataCache,
scheduler,
lockExecutor,
readAheadScheduler,
@@ -634,6 +644,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor
implements DistributedL
writerZKCBuilder,
writerBKCBuilder,
writerMetadataStore,
+ logSegmentMetadataCache,
scheduler,
allocator,
statsLogger,
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index 2df1046..0f2c222 100644
---
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -20,6 +20,7 @@ package com.twitter.distributedlog;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -41,6 +42,7 @@ import
com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore;
import com.twitter.distributedlog.lock.SessionLockFactory;
import com.twitter.distributedlog.lock.ZKSessionLockFactory;
+import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
import com.twitter.distributedlog.metadata.BKDLConfig;
import com.twitter.distributedlog.metadata.LogMetadataStore;
@@ -301,6 +303,7 @@ public class BKDistributedLogNamespace implements
DistributedLogNamespace {
// log metadata store
private final LogMetadataStore metadataStore;
// log segment metadata store
+ private final LogSegmentMetadataCache logSegmentMetadataCache;
private final LogSegmentMetadataStore writerSegmentMetadataStore;
private final LogSegmentMetadataStore readerSegmentMetadataStore;
// lock factory
@@ -478,6 +481,7 @@ public class BKDistributedLogNamespace implements
DistributedLogNamespace {
new ZKLogSegmentMetadataStore(conf, sharedWriterZKCForDL,
scheduler);
this.readerSegmentMetadataStore =
new ZKLogSegmentMetadataStore(conf, sharedReaderZKCForDL,
scheduler);
+ this.logSegmentMetadataCache = new LogSegmentMetadataCache(conf,
Ticker.systemTicker());
LOG.info("Constructed BK DistributedLogNamespace : clientId = {},
regionId = {}, federated = {}.",
new Object[] { clientId, regionId,
bkdlConfig.isFederatedNamespace() });
@@ -883,6 +887,7 @@ public class BKDistributedLogNamespace implements
DistributedLogNamespace {
lockFactory, /* Lock Factory */
writerSegmentMetadataStore, /* Log Segment Metadata
Store for DL Writers */
readerSegmentMetadataStore, /* Log Segment Metadata
Store for DL Readers */
+ logSegmentMetadataCache, /* Log Segment Metadata
Cache */
scheduler, /* DL scheduler */
readAheadExecutor, /* Read Aheader Executor */
lockStateExecutor, /* Lock State Executor */
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
----------------------------------------------------------------------
diff --git
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
index 460de11..2a6e85b 100644
---
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
+++
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
@@ -19,33 +19,34 @@ package com.twitter.distributedlog;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
-import com.twitter.distributedlog.callback.LogSegmentListener;
+import com.twitter.distributedlog.callback.LogSegmentNamesListener;
import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.exceptions.LogEmptyException;
import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.exceptions.MetadataException;
+import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
import com.twitter.distributedlog.exceptions.ZKException;
import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
import com.twitter.distributedlog.io.AsyncAbortable;
import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.distributedlog.logsegment.LogSegmentCache;
+import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
+import com.twitter.distributedlog.logsegment.PerStreamLogSegmentCache;
import com.twitter.distributedlog.logsegment.LogSegmentFilter;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
+import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.util.Function;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import org.apache.bookkeeper.stats.AlertStatsLogger;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -63,12 +64,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -81,11 +78,6 @@ import java.util.concurrent.atomic.AtomicReference;
* <p>
* Those operations are:
* <ul>
- * <li>force_get_list: force to get the list of log segments.
- * <li>get_list: get the list of the log segments. it might just retrieve from
- * local log segment cache.
- * <li>get_filtered_list: get the filtered list of log segments.
- * <li>get_full_list: get the full list of log segments.
* <li>get_inprogress_segment: time between the inprogress log segment created
and
* the handler read it.
* <li>get_completed_segment: time between a log segment is turned to
completed and
@@ -98,7 +90,7 @@ import java.util.concurrent.atomic.AtomicReference;
* @see BKLogWriteHandler
* @see BKLogReadHandler
*/
-public abstract class BKLogHandler implements Watcher, AsyncCloseable,
AsyncAbortable {
+public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable {
static final Logger LOG = LoggerFactory.getLogger(BKLogHandler.class);
protected final ZKLogMetadata logMetadata;
@@ -106,41 +98,26 @@ public abstract class BKLogHandler implements Watcher,
AsyncCloseable, AsyncAbor
protected final ZooKeeperClient zooKeeperClient;
protected final BookKeeperClient bookKeeperClient;
protected final LogSegmentMetadataStore metadataStore;
+ protected final LogSegmentMetadataCache metadataCache;
protected final int firstNumEntriesPerReadLastRecordScan;
protected final int maxNumEntriesPerReadLastRecordScan;
protected volatile long lastLedgerRollingTimeMillis = -1;
protected final OrderedScheduler scheduler;
protected final StatsLogger statsLogger;
protected final AlertStatsLogger alertStatsLogger;
- private final AtomicBoolean ledgerListWatchSet = new AtomicBoolean(false);
- private final AtomicBoolean isFullListFetched = new AtomicBoolean(false);
protected volatile boolean reportGetSegmentStats = false;
private final String lockClientId;
protected final AtomicReference<IOException> metadataException = new
AtomicReference<IOException>(null);
- // listener
- protected final CopyOnWriteArraySet<LogSegmentListener> listeners =
- new CopyOnWriteArraySet<LogSegmentListener>();
+ // Maintain the list of log segments per stream
+ protected final PerStreamLogSegmentCache logSegmentCache;
- // Maintain the list of ledgers
- protected final LogSegmentCache logSegmentCache;
- protected volatile SyncGetLedgersCallback firstGetLedgersTask = null;
- protected final AsyncNotification notification;
- // log segment filter
- protected final LogSegmentFilter filter;
-
- // zookeeper children watcher
- private final Watcher getChildrenWatcher;
// trace
protected final long metadataLatencyWarnThresholdMillis;
// Stats
- private final OpStatsLogger forceGetListStat;
- private final OpStatsLogger getListStat;
- private final OpStatsLogger getFilteredListStat;
- private final OpStatsLogger getFullListStat;
private final OpStatsLogger getInprogressSegmentStat;
private final OpStatsLogger getCompletedSegmentStat;
private final OpStatsLogger negativeGetInprogressSegmentStat;
@@ -148,96 +125,6 @@ public abstract class BKLogHandler implements Watcher,
AsyncCloseable, AsyncAbor
private final OpStatsLogger recoverLastEntryStats;
private final OpStatsLogger recoverScannedEntriesStats;
- static class SyncGetLedgersCallback implements
GenericCallback<List<LogSegmentMetadata>> {
-
- final String path;
- final boolean allowEmpty;
- final CountDownLatch countDownLatch = new CountDownLatch(1);
- final Promise<List<LogSegmentMetadata>> promise =
- new Promise<List<LogSegmentMetadata>>();
-
- int rc = KeeperException.Code.APIERROR.intValue();
-
- SyncGetLedgersCallback(String path, boolean allowEmpty) {
- this.path = path;
- this.allowEmpty = allowEmpty;
- }
-
- @Override
- public void operationComplete(int rc, List<LogSegmentMetadata>
logSegmentMetadatas) {
- this.rc = rc;
- if (KeeperException.Code.OK.intValue() == rc) {
- LOG.debug("Updated ledgers list for {} : {}", path,
logSegmentMetadatas);
- promise.setValue(logSegmentMetadatas);
- } else if (KeeperException.Code.NONODE.intValue() == rc) {
- if (allowEmpty) {
- promise.setValue(new ArrayList<LogSegmentMetadata>(0));
- } else {
- promise.setException(new LogNotFoundException("Log " +
path + " is not found"));
- }
- } else {
- promise.setException(new MetadataException("Error getting
ledgers list for " + path));
- }
- countDownLatch.countDown();
- }
-
- void waitForFinish() throws IOException {
- try {
- countDownLatch.await();
- } catch (InterruptedException e) {
- throw new DLInterruptedException("Interrupted on getting
ledgers list for " + path, e);
- }
- if (KeeperException.Code.OK.intValue() != rc) {
- if (KeeperException.Code.NONODE.intValue() == rc) {
- if (!allowEmpty) {
- throw new LogNotFoundException("Log " + path + " is
not found");
- }
- } else {
- throw new MetadataException("Error getting ledgers list
for " + path);
- }
- }
- }
- }
-
- static class NOPGetLedgersCallback implements
GenericCallback<List<LogSegmentMetadata>> {
-
- final String path;
-
- NOPGetLedgersCallback(String path) {
- this.path = path;
- }
-
- @Override
- public void operationComplete(int rc, List<LogSegmentMetadata>
logSegmentMetadatas) {
- if (KeeperException.Code.OK.intValue() == rc) {
- LOG.debug("Updated ledgers list : {}", path,
logSegmentMetadatas);
- }
- }
- }
-
- class WatcherGetLedgersCallback implements
GenericCallback<List<LogSegmentMetadata>>, Runnable {
-
- final String path;
-
- WatcherGetLedgersCallback(String path) {
- this.path = path;
- }
-
- @Override
- public void operationComplete(int rc, List<LogSegmentMetadata>
logSegmentMetadatas) {
- if (KeeperException.Code.OK.intValue() == rc) {
- LOG.debug("Updated ledgers list {} : {}", path,
logSegmentMetadatas);
- } else {
- scheduler.schedule(this, conf.getZKRetryBackoffMaxMillis(),
TimeUnit.MILLISECONDS);
- }
- }
-
- @Override
- public void run() {
- asyncGetLedgerListWithRetries(LogSegmentMetadata.COMPARATOR,
filter, getChildrenWatcher, this);
- }
- }
-
/**
* Construct a Bookkeeper journal manager.
*/
@@ -246,11 +133,10 @@ public abstract class BKLogHandler implements Watcher,
AsyncCloseable, AsyncAbor
ZooKeeperClientBuilder zkcBuilder,
BookKeeperClientBuilder bkcBuilder,
LogSegmentMetadataStore metadataStore,
+ LogSegmentMetadataCache metadataCache,
OrderedScheduler scheduler,
StatsLogger statsLogger,
AlertStatsLogger alertStatsLogger,
- AsyncNotification notification,
- LogSegmentFilter filter,
String lockClientId) {
Preconditions.checkNotNull(zkcBuilder);
Preconditions.checkNotNull(bkcBuilder);
@@ -259,9 +145,9 @@ public abstract class BKLogHandler implements Watcher,
AsyncCloseable, AsyncAbor
this.scheduler = scheduler;
this.statsLogger = statsLogger;
this.alertStatsLogger = alertStatsLogger;
- this.notification = notification;
- this.filter = filter;
- this.logSegmentCache = new LogSegmentCache(metadata.getLogName());
+ this.logSegmentCache = new PerStreamLogSegmentCache(
+ metadata.getLogName(),
+ conf.isLogSegmentSequenceNumberValidationEnabled());
firstNumEntriesPerReadLastRecordScan =
conf.getFirstNumEntriesPerReadLastRecordScan();
maxNumEntriesPerReadLastRecordScan =
conf.getMaxNumEntriesPerReadLastRecordScan();
@@ -269,20 +155,14 @@ public abstract class BKLogHandler implements Watcher,
AsyncCloseable, AsyncAbor
LOG.debug("Using ZK Path {}", logMetadata.getLogRootPath());
this.bookKeeperClient = bkcBuilder.build();
this.metadataStore = metadataStore;
+ this.metadataCache = metadataCache;
this.lockClientId = lockClientId;
- this.getChildrenWatcher = this.zooKeeperClient.getWatcherManager()
- .registerChildWatcher(logMetadata.getLogSegmentsPath(), this);
-
// Traces
this.metadataLatencyWarnThresholdMillis =
conf.getMetadataLatencyWarnThresholdMillis();
// Stats
StatsLogger segmentsLogger = statsLogger.scope("logsegments");
- forceGetListStat = segmentsLogger.getOpStatsLogger("force_get_list");
- getListStat = segmentsLogger.getOpStatsLogger("get_list");
- getFilteredListStat =
segmentsLogger.getOpStatsLogger("get_filtered_list");
- getFullListStat = segmentsLogger.getOpStatsLogger("get_full_list");
getInprogressSegmentStat =
segmentsLogger.getOpStatsLogger("get_inprogress_segment");
getCompletedSegmentStat =
segmentsLogger.getOpStatsLogger("get_completed_segment");
negativeGetInprogressSegmentStat =
segmentsLogger.getOpStatsLogger("negative_get_inprogress_segment");
@@ -306,67 +186,25 @@ public abstract class BKLogHandler implements Watcher,
AsyncCloseable, AsyncAbor
return lockClientId;
}
- protected void registerListener(LogSegmentListener listener) {
- listeners.add(listener);
- }
-
- protected void unregisterListener(LogSegmentListener listener) {
- listeners.remove(listener);
- }
-
- protected void notifyUpdatedLogSegments(List<LogSegmentMetadata> segments)
{
- for (LogSegmentListener listener : listeners) {
- List<LogSegmentMetadata> listToReturn =
- new ArrayList<LogSegmentMetadata>(segments);
- Collections.sort(listToReturn, LogSegmentMetadata.DESC_COMPARATOR);
- listener.onSegmentsUpdated(listToReturn);
- }
- }
-
- protected void scheduleGetAllLedgersTaskIfNeeded() {
- if (isFullListFetched.get()) {
- return;
- }
- asyncGetLedgerListWithRetries(LogSegmentMetadata.COMPARATOR,
LogSegmentFilter.DEFAULT_FILTER,
- null, new NOPGetLedgersCallback(getFullyQualifiedName()));
- }
-
- protected void scheduleGetLedgersTask(boolean watch, boolean allowEmpty) {
- if (!watch) {
- ledgerListWatchSet.set(true);
- }
- LOG.info("Scheduling get ledgers task for {}, watch = {}.",
getFullyQualifiedName(), watch);
- firstGetLedgersTask = new
SyncGetLedgersCallback(getFullyQualifiedName(), allowEmpty);
- asyncGetLedgerListWithRetries(LogSegmentMetadata.COMPARATOR, filter,
- watch ? getChildrenWatcher : null, firstGetLedgersTask);
- LOG.info("Scheduled get ledgers task for {}, watch = {}.",
getFullyQualifiedName(), watch);
- }
-
- protected void waitFirstGetLedgersTaskToFinish() throws IOException {
- SyncGetLedgersCallback task = firstGetLedgersTask;
- if (null != task) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Wait first getting ledgers task to finish for {}.",
getFullyQualifiedName());
- }
- task.waitForFinish();
- }
- }
-
public Future<LogRecordWithDLSN> asyncGetFirstLogRecord() {
final Promise<LogRecordWithDLSN> promise = new
Promise<LogRecordWithDLSN>();
checkLogStreamExistsAsync().addEventListener(new
FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
- asyncGetFullLedgerList(true, true).addEventListener(new
FutureEventListener<List<LogSegmentMetadata>>() {
+ readLogSegmentsFromStore(
+ LogSegmentMetadata.COMPARATOR,
+ LogSegmentFilter.DEFAULT_FILTER,
+ null
+ ).addEventListener(new
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
@Override
- public void onSuccess(List<LogSegmentMetadata> ledgerList)
{
- if (ledgerList.isEmpty()) {
+ public void onSuccess(Versioned<List<LogSegmentMetadata>>
ledgerList) {
+ if (ledgerList.getValue().isEmpty()) {
promise.setException(new LogEmptyException("Log "
+ getFullyQualifiedName() + " has no records"));
return;
}
Future<LogRecordWithDLSN> firstRecord = null;
- for (LogSegmentMetadata ledger : ledgerList) {
+ for (LogSegmentMetadata ledger :
ledgerList.getValue()) {
if (!ledger.isTruncated() &&
(ledger.getRecordCount() > 0 || ledger.isInProgress())) {
firstRecord = asyncReadFirstUserRecord(ledger,
DLSN.InitialDLSN);
break;
@@ -399,15 +237,25 @@ public abstract class BKLogHandler implements Watcher,
AsyncCloseable, AsyncAbor
checkLogStreamExistsAsync().addEventListener(new
FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
- asyncGetFullLedgerListDesc(true, true).addEventListener(new
FutureEventListener<List<LogSegmentMetadata>>() {
+ readLogSegmentsFromStore(
+ LogSegmentMetadata.DESC_COMPARATOR,
+ LogSegmentFilter.DEFAULT_FILTER,
+ null
+ ).addEventListener(new
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
@Override
- public void onSuccess(List<LogSegmentMetadata> ledgerList)
{
- if (ledgerList.isEmpty()) {
- promise.setException(new LogEmptyException("Log "
+ getFullyQualifiedName() + " has no records"));
+ public void onSuccess(Versioned<List<LogSegmentMetadata>>
ledgerList) {
+ if (ledgerList.getValue().isEmpty()) {
+ promise.setException(
+ new LogEmptyException("Log " +
getFullyQualifiedName() + " has no records"));
return;
}
- asyncGetLastLogRecord(ledgerList.iterator(), promise,
recover, false, includeEndOfStream);
+ asyncGetLastLogRecord(
+ ledgerList.getValue().iterator(),
+ promise,
+ recover,
+ false,
+ includeEndOfStream);
}
@Override
@@ -537,11 +385,15 @@ public abstract class BKLogHandler implements Watcher,
AsyncCloseable, AsyncAbor
return checkLogStreamExistsAsync().flatMap(new Function<Void,
Future<Long>>() {
public Future<Long> apply(Void done) {
- return asyncGetFullLedgerList(true, false).flatMap(new
Function<List<LogSegmentMetadata>, Future<Long>>() {
- public Future<Long> apply(List<LogSegmentMetadata>
ledgerList) {
+ return readLogSegmentsFromStore(
+ LogSegmentMetadata.COMPARATOR,
+ LogSegmentFilter.DEFAULT_FILTER,
+ null
+ ).flatMap(new Function<Versioned<List<LogSegmentMetadata>>,
Future<Long>>() {
+ public Future<Long>
apply(Versioned<List<LogSegmentMetadata>> ledgerList) {
- List<Future<Long>> futureCounts = new
ArrayList<Future<Long>>(ledgerList.size());
- for (LogSegmentMetadata ledger : ledgerList) {
+ List<Future<Long>> futureCounts = new
ArrayList<Future<Long>>(ledgerList.getValue().size());
+ for (LogSegmentMetadata ledger :
ledgerList.getValue()) {
if (ledger.getLogSegmentSequenceNumber() >=
beginDLSN.getLogSegmentSequenceNo()) {
futureCounts.add(asyncGetLogRecordCount(ledger, beginDLSN));
}
@@ -665,11 +517,25 @@ public abstract class BKLogHandler implements Watcher,
AsyncCloseable, AsyncAbor
return logMetadata.getFullyQualifiedName();
}
- // Ledgers Related Functions
+ // Log Segments Related Functions
+ //
// ***Note***
- // Get ledger list should go through #getCachedLogSegments as we need to
assign start sequence id for inprogress log
- // segment so the reader could generate the right sequence id.
+ // Get log segment list should go through #getCachedLogSegments as we need
to assign start sequence id
+ // for inprogress log segment so the reader could generate the right
sequence id.
+ //
+ // ***PerStreamCache vs LogSegmentMetadataCache **
+ // The per stream cache maintains the list of segments per stream, while
the metadata cache
+ // maintains log segments. The metadata cache is just to reduce the access
to zookeeper, it is
+ // okay that some of the log segments are not in the cache; however the
per stream cache can not
+ // have any gaps between log segment sequence numbers which it has to be
accurate.
+ /**
+ * Get the cached log segments.
+ *
+ * @param comparator the comparator to sort the returned log segments.
+ * @return list of sorted log segments
+ * @throws UnexpectedException if unexpected condition detected.
+ */
protected List<LogSegmentMetadata>
getCachedLogSegments(Comparator<LogSegmentMetadata> comparator)
throws UnexpectedException {
try {
@@ -683,227 +549,6 @@ public abstract class BKLogHandler implements Watcher,
AsyncCloseable, AsyncAbor
}
}
- protected List<LogSegmentMetadata> getFullLedgerList(boolean forceFetch,
boolean throwOnEmpty)
- throws IOException {
- return getLedgerList(forceFetch, true, LogSegmentMetadata.COMPARATOR,
throwOnEmpty);
- }
-
- protected List<LogSegmentMetadata> getFullLedgerListDesc(boolean
forceFetch, boolean throwOnEmpty)
- throws IOException {
- return getLedgerList(forceFetch, true,
LogSegmentMetadata.DESC_COMPARATOR, throwOnEmpty);
- }
-
- protected List<LogSegmentMetadata> getFilteredLedgerList(boolean
forceFetch, boolean throwOnEmpty)
- throws IOException {
- return getLedgerList(forceFetch, false, LogSegmentMetadata.COMPARATOR,
throwOnEmpty);
- }
-
- protected List<LogSegmentMetadata> getFilteredLedgerListDesc(boolean
forceFetch, boolean throwOnEmpty)
- throws IOException {
- return getLedgerList(forceFetch, false,
LogSegmentMetadata.DESC_COMPARATOR, throwOnEmpty);
- }
-
- protected List<LogSegmentMetadata> getLedgerList(boolean forceFetch,
- boolean fetchFullList,
-
Comparator<LogSegmentMetadata> comparator,
- boolean throwOnEmpty)
- throws IOException {
- Stopwatch stopwatch = Stopwatch.createStarted();
- boolean success = false;
- try {
- List<LogSegmentMetadata> segments =
- doGetLedgerList(forceFetch, fetchFullList, comparator,
throwOnEmpty);
- success = true;
- return segments;
- } finally {
- OpStatsLogger statsLogger = fetchFullList ? getFullListStat :
getFilteredListStat;
- if (success) {
-
statsLogger.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
- } else {
-
statsLogger.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
- }
- }
- }
-
- private List<LogSegmentMetadata> doGetLedgerList(boolean forceFetch,
boolean fetchFullList,
-
Comparator<LogSegmentMetadata> comparator,
- boolean throwOnEmpty)
- throws IOException {
- if (fetchFullList) {
- if (forceFetch || !isFullListFetched.get()) {
- return forceGetLedgerList(comparator,
LogSegmentFilter.DEFAULT_FILTER, throwOnEmpty);
- } else {
- return getCachedLogSegments(comparator);
- }
- } else {
- if (forceFetch) {
- return forceGetLedgerList(comparator, filter, throwOnEmpty);
- } else {
- if(!ledgerListWatchSet.get()) {
- scheduleGetLedgersTask(true, true);
- }
- waitFirstGetLedgersTaskToFinish();
- return getCachedLogSegments(comparator);
- }
- }
- }
-
- /**
- * Get a list of all segments in the journal.
- */
- protected List<LogSegmentMetadata> forceGetLedgerList(final
Comparator<LogSegmentMetadata> comparator,
- final
LogSegmentFilter segmentFilter,
- boolean
throwOnEmpty) throws IOException {
- final List<LogSegmentMetadata> ledgers = new
ArrayList<LogSegmentMetadata>();
- final AtomicInteger result = new AtomicInteger(-1);
- final CountDownLatch latch = new CountDownLatch(1);
- Stopwatch stopwatch = Stopwatch.createStarted();
- asyncGetLedgerListInternal(comparator, segmentFilter, null, new
GenericCallback<List<LogSegmentMetadata>>() {
- @Override
- public void operationComplete(int rc, List<LogSegmentMetadata>
logSegmentMetadatas) {
- result.set(rc);
- if (KeeperException.Code.OK.intValue() == rc) {
- ledgers.addAll(logSegmentMetadatas);
- } else {
- LOG.error("Failed to get ledger list for {} : with error
{}", getFullyQualifiedName(), rc);
- }
- latch.countDown();
- }
- }, new AtomicInteger(conf.getZKNumRetries()), new
AtomicLong(conf.getZKRetryBackoffStartMillis()));
- try {
- latch.await();
- } catch (InterruptedException e) {
-
forceGetListStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
- throw new DLInterruptedException("Interrupted on reading ledger
list from zkfor " + getFullyQualifiedName(), e);
- }
- long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS);
-
- KeeperException.Code rc = KeeperException.Code.get(result.get());
- if (rc == KeeperException.Code.OK) {
- forceGetListStat.registerSuccessfulEvent(elapsedMicros);
- } else {
- forceGetListStat.registerFailedEvent(elapsedMicros);
- if (KeeperException.Code.NONODE == rc) {
- throw new LogNotFoundException("Log " +
getFullyQualifiedName() + " is not found");
- } else {
- throw new IOException("ZK Exception " + rc + " reading ledger
list for " + getFullyQualifiedName());
- }
- }
-
- if (throwOnEmpty && ledgers.isEmpty()) {
- throw new LogEmptyException("Log " + getFullyQualifiedName() + "
is empty");
- }
- return ledgers;
- }
-
- protected Future<List<LogSegmentMetadata>> asyncGetFullLedgerList(boolean
forceFetch, boolean throwOnEmpty) {
- return asyncGetLedgerList(forceFetch, true,
LogSegmentMetadata.COMPARATOR, throwOnEmpty);
- }
-
- protected Future<List<LogSegmentMetadata>>
asyncGetFullLedgerListDesc(boolean forceFetch, boolean throwOnEmpty) {
- return asyncGetLedgerList(forceFetch, true,
LogSegmentMetadata.DESC_COMPARATOR, throwOnEmpty);
- }
-
- protected Future<List<LogSegmentMetadata>>
asyncGetFilteredLedgerList(boolean forceFetch, boolean throwOnEmpty) {
- return asyncGetLedgerList(forceFetch, false,
LogSegmentMetadata.COMPARATOR, throwOnEmpty);
- }
-
- protected Future<List<LogSegmentMetadata>>
asyncGetFilteredLedgerListDesc(boolean forceFetch, boolean throwOnEmpty) {
- return asyncGetLedgerList(forceFetch, false,
LogSegmentMetadata.DESC_COMPARATOR, throwOnEmpty);
- }
-
- protected Future<List<LogSegmentMetadata>> asyncGetLedgerList(final
boolean forceFetch,
- final
boolean fetchFullList,
- final
Comparator<LogSegmentMetadata> comparator,
- final
boolean throwOnEmpty) {
- final Promise<List<LogSegmentMetadata>> promise = new
Promise<List<LogSegmentMetadata>>();
- final Stopwatch stopwatch = Stopwatch.createStarted();
- final OpStatsLogger statsLogger = fetchFullList ? getFullListStat :
getFilteredListStat;
- asyncDoGetLedgerList(forceFetch, fetchFullList, comparator,
throwOnEmpty)
- .addEventListener(new
FutureEventListener<List<LogSegmentMetadata>>() {
- @Override
- public void onSuccess(List<LogSegmentMetadata> value) {
-
statsLogger.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
- promise.setValue(value);
- }
-
- @Override
- public void onFailure(Throwable cause) {
-
statsLogger.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
- promise.setException(cause);
- }
- });
- return promise;
- }
-
- private Future<List<LogSegmentMetadata>> asyncDoGetLedgerList(final
boolean forceFetch,
- final
boolean fetchFullList,
- final
Comparator<LogSegmentMetadata> comparator,
- final
boolean throwOnEmpty) {
- if (fetchFullList) {
- if (forceFetch || !isFullListFetched.get()) {
- return asyncForceGetLedgerList(comparator,
LogSegmentFilter.DEFAULT_FILTER, throwOnEmpty);
- } else {
- try {
- return Future.value(getCachedLogSegments(comparator));
- } catch (UnexpectedException ue) {
- return Future.exception(ue);
- }
- }
- } else {
- if (forceFetch) {
- return asyncForceGetLedgerList(comparator, filter,
throwOnEmpty);
- } else {
- final Promise<List<LogSegmentMetadata>> promise =
- new Promise<List<LogSegmentMetadata>>();
- SyncGetLedgersCallback task = firstGetLedgersTask;
- task.promise.addEventListener(new
FutureEventListener<List<LogSegmentMetadata>>() {
- @Override
- public void onSuccess(List<LogSegmentMetadata> value) {
- try {
- promise.setValue(getCachedLogSegments(comparator));
- } catch (UnexpectedException e) {
- promise.setException(e);
- }
- }
-
- @Override
- public void onFailure(Throwable cause) {
- promise.setException(cause);
- }
- });
- return promise;
- }
- }
- }
-
- protected Future<List<LogSegmentMetadata>> asyncForceGetLedgerList(final
Comparator<LogSegmentMetadata> comparator,
- final
LogSegmentFilter segmentFilter,
- final
boolean throwOnEmpty) {
- final Promise<List<LogSegmentMetadata>> promise = new
Promise<List<LogSegmentMetadata>>();
- final Stopwatch stopwatch = Stopwatch.createStarted();
- asyncGetLedgerListWithRetries(comparator, segmentFilter, null)
- .addEventListener(new
FutureEventListener<List<LogSegmentMetadata>>() {
-
- @Override
- public void onSuccess(List<LogSegmentMetadata> ledgers) {
-
forceGetListStat.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
- if (ledgers.isEmpty() && throwOnEmpty) {
- promise.setException(new LogEmptyException("Log " +
getFullyQualifiedName() + " is empty"));
- } else {
- promise.setValue(ledgers);
- }
- }
-
- @Override
- public void onFailure(Throwable cause) {
-
forceGetListStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
- promise.setException(cause);
- }
- });
- return promise;
- }
-
/**
* Add the segment <i>metadata</i> for <i>name</i> in the cache.
*
@@ -913,6 +558,7 @@ public abstract class BKLogHandler implements Watcher,
AsyncCloseable, AsyncAbor
* segment metadata.
*/
protected void addLogSegmentToCache(String name, LogSegmentMetadata
metadata) {
+ metadataCache.put(metadata.getZkPath(), metadata);
logSegmentCache.add(name, metadata);
// update the last ledger rolling time
if (!metadata.isInProgress() && (lastLedgerRollingTimeMillis <
metadata.getCompletionTime())) {
@@ -952,224 +598,189 @@ public abstract class BKLogHandler implements Watcher,
AsyncCloseable, AsyncAbor
}
}
+ /**
+ * Read log segment <i>name</i> from the cache.
+ *
+ * @param name name of the log segment
+ * @return log segment metadata
+ */
protected LogSegmentMetadata readLogSegmentFromCache(String name) {
return logSegmentCache.get(name);
}
+ /**
+ * Remove the log segment <i>name</i> from the cache.
+ *
+ * @param name name of the log segment.
+ * @return log segment metadata
+ */
protected LogSegmentMetadata removeLogSegmentFromCache(String name) {
+ metadataCache.invalidate(name);
return logSegmentCache.remove(name);
}
- public void asyncGetLedgerList(final Comparator<LogSegmentMetadata>
comparator,
- Watcher watcher,
- final
GenericCallback<List<LogSegmentMetadata>> callback) {
- asyncGetLedgerListWithRetries(comparator, filter, watcher, callback);
+ /**
+ * Update the log segment cache with updated mapping
+ *
+ * @param logSegmentsRemoved log segments removed
+ * @param logSegmentsAdded log segments added
+ */
+ protected void updateLogSegmentCache(Set<String> logSegmentsRemoved,
+ Map<String, LogSegmentMetadata>
logSegmentsAdded) {
+ for (String segmentName : logSegmentsRemoved) {
+ metadataCache.invalidate(segmentName);
+ }
+ for (Map.Entry<String, LogSegmentMetadata> entry :
logSegmentsAdded.entrySet()) {
+ metadataCache.put(entry.getKey(), entry.getValue());
+ }
+ logSegmentCache.update(logSegmentsRemoved, logSegmentsAdded);
}
- protected Future<List<LogSegmentMetadata>>
asyncGetLedgerListWithRetries(Comparator<LogSegmentMetadata> comparator,
-
LogSegmentFilter segmentFilter,
-
Watcher watcher) {
- final Promise<List<LogSegmentMetadata>> promise = new
Promise<List<LogSegmentMetadata>>();
- asyncGetLedgerListWithRetries(comparator, segmentFilter, watcher, new
GenericCallback<List<LogSegmentMetadata>>() {
- @Override
- public void operationComplete(int rc, List<LogSegmentMetadata>
segments) {
- if (KeeperException.Code.OK.intValue() == rc) {
- promise.setValue(segments);
- } else if (KeeperException.Code.NONODE.intValue() == rc) {
- promise.setException(new LogNotFoundException("Log " +
getFullyQualifiedName() + " not found"));
- } else {
- String errMsg = "ZK Exception " + rc + " reading ledger
list for " + getFullyQualifiedName();
- promise.setException(new ZKException(errMsg,
KeeperException.Code.get(rc)));
- }
- }
- });
- return promise;
- }
+ /**
+ * Read the log segments from the store and register a listener
+ * @param comparator
+ * @param segmentFilter
+ * @param logSegmentNamesListener
+ * @return future represents the result of log segments
+ */
+ public Future<Versioned<List<LogSegmentMetadata>>>
readLogSegmentsFromStore(
+ final Comparator<LogSegmentMetadata> comparator,
+ final LogSegmentFilter segmentFilter,
+ final LogSegmentNamesListener logSegmentNamesListener) {
+ final Promise<Versioned<List<LogSegmentMetadata>>> readResult =
+ new Promise<Versioned<List<LogSegmentMetadata>>>();
+ metadataStore.getLogSegmentNames(logMetadata.getLogSegmentsPath(),
logSegmentNamesListener)
+ .addEventListener(new
FutureEventListener<Versioned<List<String>>>() {
+ @Override
+ public void onFailure(Throwable cause) {
+ FutureUtils.setException(readResult, cause);
+ }
- private void asyncGetLedgerListWithRetries(final
Comparator<LogSegmentMetadata> comparator,
- final LogSegmentFilter
segmentFilter,
- final Watcher watcher,
- final
GenericCallback<List<LogSegmentMetadata>> finalCallback) {
- asyncGetLedgerListInternal(comparator, segmentFilter, watcher,
finalCallback,
- new AtomicInteger(conf.getZKNumRetries()), new
AtomicLong(conf.getZKRetryBackoffStartMillis()));
+ @Override
+ public void onSuccess(Versioned<List<String>>
logSegmentNames) {
+ readLogSegmentsFromStore(logSegmentNames, comparator,
segmentFilter, readResult);
+ }
+ });
+ return readResult;
}
- private void asyncGetLedgerListInternal(final
Comparator<LogSegmentMetadata> comparator,
+ protected void readLogSegmentsFromStore(final Versioned<List<String>>
logSegmentNames,
+ final
Comparator<LogSegmentMetadata> comparator,
final LogSegmentFilter
segmentFilter,
- final Watcher watcher,
- final
GenericCallback<List<LogSegmentMetadata>> finalCallback,
- final AtomicInteger
numAttemptsLeft,
- final AtomicLong backoffMillis) {
- final Stopwatch stopwatch = Stopwatch.createStarted();
- try {
+ final
Promise<Versioned<List<LogSegmentMetadata>>> readResult) {
+ Set<String> segmentsReceived = new HashSet<String>();
+
segmentsReceived.addAll(segmentFilter.filter(logSegmentNames.getValue()));
+ Set<String> segmentsAdded;
+ final Set<String> removedSegments = Collections.synchronizedSet(new
HashSet<String>());
+ final Map<String, LogSegmentMetadata> addedSegments =
+ Collections.synchronizedMap(new HashMap<String,
LogSegmentMetadata>());
+ Pair<Set<String>, Set<String>> segmentChanges =
logSegmentCache.diff(segmentsReceived);
+ segmentsAdded = segmentChanges.getLeft();
+ removedSegments.addAll(segmentChanges.getRight());
+
+ if (segmentsAdded.isEmpty()) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Async getting ledger list for {}.",
getFullyQualifiedName());
+ LOG.trace("No segments added for {}.",
getFullyQualifiedName());
}
- final GenericCallback<List<LogSegmentMetadata>> callback = new
GenericCallback<List<LogSegmentMetadata>>() {
- @Override
- public void operationComplete(int rc, List<LogSegmentMetadata>
result) {
- long elapsedMicros =
stopwatch.stop().elapsed(TimeUnit.MICROSECONDS);
- if (KeeperException.Code.OK.intValue() != rc) {
- getListStat.registerFailedEvent(elapsedMicros);
- } else {
- if (LogSegmentFilter.DEFAULT_FILTER == segmentFilter) {
- isFullListFetched.set(true);
- }
- getListStat.registerSuccessfulEvent(elapsedMicros);
- }
- finalCallback.operationComplete(rc, result);
- }
- };
-
zooKeeperClient.get().getChildren(logMetadata.getLogSegmentsPath(), watcher,
new AsyncCallback.Children2Callback() {
- @Override
- public void processResult(final int rc, final String path,
final Object ctx, final List<String> children, final Stat stat) {
- if (KeeperException.Code.OK.intValue() != rc) {
- if ((KeeperException.Code.CONNECTIONLOSS.intValue() ==
rc ||
- KeeperException.Code.SESSIONEXPIRED.intValue() ==
rc ||
- KeeperException.Code.SESSIONMOVED.intValue() ==
rc) &&
- numAttemptsLeft.decrementAndGet() > 0) {
- long backoffMs = backoffMillis.get();
-
backoffMillis.set(Math.min(conf.getZKRetryBackoffMaxMillis(), 2 * backoffMs));
- scheduler.schedule(new Runnable() {
- @Override
- public void run() {
- asyncGetLedgerListInternal(comparator,
segmentFilter, watcher,
- finalCallback, numAttemptsLeft,
backoffMillis);
- }
- }, backoffMs, TimeUnit.MILLISECONDS);
- return;
- }
- callback.operationComplete(rc, null);
- return;
- }
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Got ledger list from {} : {}",
logMetadata.getLogSegmentsPath(), children);
- }
+ // update the cache before #getCachedLogSegments to return
+ updateLogSegmentCache(removedSegments, addedSegments);
- ledgerListWatchSet.set(true);
- Set<String> segmentsReceived = new HashSet<String>();
- segmentsReceived.addAll(segmentFilter.filter(children));
- Set<String> segmentsAdded;
- final Set<String> removedSegments =
Collections.synchronizedSet(new HashSet<String>());
- final Map<String, LogSegmentMetadata> addedSegments =
- Collections.synchronizedMap(new HashMap<String,
LogSegmentMetadata>());
- Pair<Set<String>, Set<String>> segmentChanges =
logSegmentCache.diff(segmentsReceived);
- segmentsAdded = segmentChanges.getLeft();
- removedSegments.addAll(segmentChanges.getRight());
+ List<LogSegmentMetadata> segmentList;
+ try {
+ segmentList = getCachedLogSegments(comparator);
+ } catch (UnexpectedException e) {
+ FutureUtils.setException(readResult, e);
+ return;
+ }
- if (segmentsAdded.isEmpty()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("No segments added for {}.",
getFullyQualifiedName());
- }
+ FutureUtils.setValue(readResult,
+ new Versioned<List<LogSegmentMetadata>>(segmentList,
logSegmentNames.getVersion()));
+ return;
+ }
- // update the cache before fetch
- logSegmentCache.update(removedSegments, addedSegments);
+ final AtomicInteger numChildren = new
AtomicInteger(segmentsAdded.size());
+ final AtomicInteger numFailures = new AtomicInteger(0);
+ for (final String segment: segmentsAdded) {
+ String logSegmentPath = logMetadata.getLogSegmentPath(segment);
+ LogSegmentMetadata cachedSegment =
metadataCache.get(logSegmentPath);
+ if (null != cachedSegment) {
+ addedSegments.put(segment, cachedSegment);
+ completeReadLogSegmentsFromStore(
+ removedSegments,
+ addedSegments,
+ comparator,
+ readResult,
+ logSegmentNames.getVersion(),
+ numChildren,
+ numFailures);
+ continue;
+ }
+ metadataStore.getLogSegment(logSegmentPath)
+ .addEventListener(new
FutureEventListener<LogSegmentMetadata>() {
- List<LogSegmentMetadata> segmentList;
- try {
- segmentList = getCachedLogSegments(comparator);
- } catch (UnexpectedException e) {
-
callback.operationComplete(KeeperException.Code.DATAINCONSISTENCY.intValue(),
null);
- return;
- }
-
callback.operationComplete(KeeperException.Code.OK.intValue(), segmentList);
- notifyUpdatedLogSegments(segmentList);
- if (!removedSegments.isEmpty()) {
- notifyOnOperationComplete();
+ @Override
+ public void onSuccess(LogSegmentMetadata result) {
+ addedSegments.put(segment, result);
+ complete();
}
- return;
- }
- final AtomicInteger numChildren = new
AtomicInteger(segmentsAdded.size());
- final AtomicInteger numFailures = new AtomicInteger(0);
- for (final String segment: segmentsAdded) {
-
metadataStore.getLogSegment(logMetadata.getLogSegmentPath(segment))
- .addEventListener(new
FutureEventListener<LogSegmentMetadata>() {
-
- @Override
- public void onSuccess(LogSegmentMetadata
result) {
- addedSegments.put(segment, result);
- complete();
- }
-
- @Override
- public void onFailure(Throwable cause) {
- // NONODE exception is possible in two
cases
- // 1. A log segment was deleted by
truncation between the call to getChildren and read
- // attempt on the znode corresponding
to the segment
- // 2. In progress segment has been
completed => inprogress ZNode does not exist
- if (cause instanceof KeeperException &&
- KeeperException.Code.NONODE ==
((KeeperException) cause).code()) {
- removedSegments.add(segment);
- complete();
- } else {
- // fail fast
- if (1 ==
numFailures.incrementAndGet()) {
- int rcToReturn =
KeeperException.Code.SYSTEMERROR.intValue();
- if (cause instanceof
KeeperException) {
- rcToReturn =
((KeeperException) cause).code().intValue();
- } else if (cause instanceof
ZKException) {
- rcToReturn =
((ZKException) cause).getKeeperExceptionCode().intValue();
- }
- // :( properly we need dlog
related response code.
-
callback.operationComplete(rcToReturn, null);
- return;
- }
- }
- }
+ @Override
+ public void onFailure(Throwable cause) {
+ // LogSegmentNotFoundException exception is
possible in two cases
+ // 1. A log segment was deleted by truncation
between the call to getChildren and read
+ // attempt on the znode corresponding to the
segment
+ // 2. In progress segment has been completed =>
inprogress ZNode does not exist
+ if (cause instanceof LogSegmentNotFoundException) {
+ removedSegments.add(segment);
+ complete();
+ } else {
+ // fail fast
+ if (1 == numFailures.incrementAndGet()) {
+ FutureUtils.setException(readResult,
cause);
+ return;
+ }
+ }
+ }
- private void complete() {
- if (0 == numChildren.decrementAndGet()
&& numFailures.get() == 0) {
- // update the cache only when
fetch completed
-
logSegmentCache.update(removedSegments, addedSegments);
- List<LogSegmentMetadata>
segmentList;
- try {
- segmentList =
getCachedLogSegments(comparator);
- } catch (UnexpectedException e) {
-
callback.operationComplete(KeeperException.Code.DATAINCONSISTENCY.intValue(),
null);
- return;
- }
-
callback.operationComplete(KeeperException.Code.OK.intValue(), segmentList);
-
notifyUpdatedLogSegments(segmentList);
- notifyOnOperationComplete();
- }
- }
- });
- }
- }
- }, null);
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-
getListStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-
finalCallback.operationComplete(KeeperException.Code.CONNECTIONLOSS.intValue(),
null);
- } catch (InterruptedException e) {
-
getListStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-
finalCallback.operationComplete(KeeperException.Code.CONNECTIONLOSS.intValue(),
null);
+ private void complete() {
+ completeReadLogSegmentsFromStore(
+ removedSegments,
+ addedSegments,
+ comparator,
+ readResult,
+ logSegmentNames.getVersion(),
+ numChildren,
+ numFailures);
+ }
+ });
}
}
- @Override
- public void process(WatchedEvent event) {
- if (Watcher.Event.EventType.None.equals(event.getType())) {
- if (event.getState() == Watcher.Event.KeeperState.Expired) {
- // if the watcher is expired
- scheduler.schedule(new
WatcherGetLedgersCallback(getFullyQualifiedName()),
- conf.getZKRetryBackoffStartMillis(),
TimeUnit.MILLISECONDS);
- }
- } else if
(Watcher.Event.EventType.NodeChildrenChanged.equals(event.getType())) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("LogSegments Changed under {}.",
getFullyQualifiedName());
- }
- asyncGetLedgerListWithRetries(LogSegmentMetadata.COMPARATOR,
filter,
- getChildrenWatcher, new
WatcherGetLedgersCallback(getFullyQualifiedName()));
+ private void completeReadLogSegmentsFromStore(final Set<String>
removedSegments,
+ final Map<String,
LogSegmentMetadata> addedSegments,
+ final
Comparator<LogSegmentMetadata> comparator,
+ final
Promise<Versioned<List<LogSegmentMetadata>>> readResult,
+ final Version
logSegmentNamesVersion,
+ final AtomicInteger
numChildren,
+ final AtomicInteger
numFailures) {
+ if (0 != numChildren.decrementAndGet()) {
+ return;
}
- }
-
- void notifyOnOperationComplete() {
- if (null != notification) {
- notification.notifyOnOperationComplete();
+ if (numFailures.get() > 0) {
+ return;
+ }
+ // update the cache only when fetch completed and before
#getCachedLogSegments
+ updateLogSegmentCache(removedSegments, addedSegments);
+ List<LogSegmentMetadata> segmentList;
+ try {
+ segmentList = getCachedLogSegments(comparator);
+ } catch (UnexpectedException e) {
+ FutureUtils.setException(readResult, e);
+ return;
}
+ FutureUtils.setValue(readResult,
+ new Versioned<List<LogSegmentMetadata>>(segmentList,
logSegmentNamesVersion));
}
}