http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java index 6a8f90e..a1e29a2 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java @@ -18,16 +18,25 @@ package com.twitter.distributedlog; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Ticker; +import com.twitter.distributedlog.callback.LogSegmentListener; +import com.twitter.distributedlog.callback.LogSegmentNamesListener; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.exceptions.DLIllegalStateException; import com.twitter.distributedlog.exceptions.DLInterruptedException; import com.twitter.distributedlog.exceptions.LockCancelledException; import com.twitter.distributedlog.exceptions.LockingException; import com.twitter.distributedlog.exceptions.LogNotFoundException; +import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException; +import com.twitter.distributedlog.exceptions.UnexpectedException; import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader; import com.twitter.distributedlog.injector.AsyncFailureInjector; import com.twitter.distributedlog.lock.DistributedLock; @@ -35,6 +44,7 @@ import com.twitter.distributedlog.lock.SessionLockFactory; import com.twitter.distributedlog.lock.ZKDistributedLock; import com.twitter.distributedlog.lock.ZKSessionLockFactory; import com.twitter.distributedlog.logsegment.LogSegmentFilter; +import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache; import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; import com.twitter.distributedlog.readahead.ReadAheadWorker; import com.twitter.distributedlog.stats.BroadCastStatsLogger; @@ -55,6 +65,8 @@ import org.apache.bookkeeper.stats.AlertStatsLogger; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.SafeRunnable; +import org.apache.bookkeeper.versioning.Version; +import org.apache.bookkeeper.versioning.Versioned; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -102,11 +114,9 @@ import scala.runtime.BoxedUnit; * All read lock related stats are exposed under scope `read_lock`. See {@link ZKDistributedLock} * for detail stats. */ -class BKLogReadHandler extends BKLogHandler { +class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class); - private static final int LAYOUT_VERSION = -1; - protected final ZKLogMetadataForReader logMetadataForReader; protected final ReadAheadCache readAheadCache; protected final LedgerHandleCache handleCache; @@ -123,6 +133,16 @@ class BKLogReadHandler extends BKLogHandler { private DistributedLock readLock; private Future<Void> lockAcquireFuture; + // notify the state change about the read handler + protected final AsyncNotification readerStateNotification; + + // log segments listener + protected boolean logSegmentsNotificationDisabled = false; + protected final CopyOnWriteArraySet<LogSegmentListener> listeners = + new CopyOnWriteArraySet<LogSegmentListener>(); + protected Versioned<List<LogSegmentMetadata>> lastNotifiedLogSegments = + new Versioned<List<LogSegmentMetadata>>(null, Version.NEW); + // stats private final AlertStatsLogger alertStatsLogger; private final StatsLogger handlerStatsLogger; @@ -132,26 +152,35 @@ class BKLogReadHandler extends BKLogHandler { /** * Construct a Bookkeeper journal manager. */ - public BKLogReadHandler(ZKLogMetadataForReader logMetadata, - Optional<String> subscriberId, - DistributedLogConfiguration conf, - DynamicDistributedLogConfiguration dynConf, - ZooKeeperClientBuilder zkcBuilder, - BookKeeperClientBuilder bkcBuilder, - LogSegmentMetadataStore metadataStore, - OrderedScheduler scheduler, - OrderedScheduler lockStateExecutor, - OrderedScheduler readAheadExecutor, - AlertStatsLogger alertStatsLogger, - ReadAheadExceptionsLogger readAheadExceptionsLogger, - StatsLogger statsLogger, - StatsLogger perLogStatsLogger, - String clientId, - AsyncNotification notification, - boolean isHandleForReading, - boolean deserializeRecordSet) { - super(logMetadata, conf, zkcBuilder, bkcBuilder, metadataStore, scheduler, - statsLogger, alertStatsLogger, notification, LogSegmentFilter.DEFAULT_FILTER, clientId); + BKLogReadHandler(ZKLogMetadataForReader logMetadata, + Optional<String> subscriberId, + DistributedLogConfiguration conf, + DynamicDistributedLogConfiguration dynConf, + ZooKeeperClientBuilder zkcBuilder, + BookKeeperClientBuilder bkcBuilder, + LogSegmentMetadataStore metadataStore, + LogSegmentMetadataCache metadataCache, + OrderedScheduler scheduler, + OrderedScheduler lockStateExecutor, + OrderedScheduler readAheadExecutor, + AlertStatsLogger alertStatsLogger, + ReadAheadExceptionsLogger readAheadExceptionsLogger, + StatsLogger statsLogger, + StatsLogger perLogStatsLogger, + String clientId, + AsyncNotification readerStateNotification, + boolean isHandleForReading, + boolean deserializeRecordSet) { + super(logMetadata, + conf, + zkcBuilder, + bkcBuilder, + metadataStore, + metadataCache, + scheduler, + statsLogger, + alertStatsLogger, + clientId); this.logMetadataForReader = logMetadata; this.dynConf = dynConf; this.readAheadExecutor = readAheadExecutor; @@ -161,6 +190,7 @@ class BKLogReadHandler extends BKLogHandler { this.handlerStatsLogger = BroadCastStatsLogger.masterslave(this.perLogStatsLogger, statsLogger); this.readAheadExceptionsLogger = readAheadExceptionsLogger; + this.readerStateNotification = readerStateNotification; handleCache = LedgerHandleCache.newBuilder() .bkc(this.bookKeeperClient) @@ -171,7 +201,7 @@ class BKLogReadHandler extends BKLogHandler { getFullyQualifiedName(), handlerStatsLogger, alertStatsLogger, - notification, + readerStateNotification, dynConf.getReadAheadMaxRecords(), deserializeRecordSet, conf.getTraceReadAheadDeliveryLatency(), @@ -308,14 +338,14 @@ class BKLogReadHandler extends BKLogHandler { if (null != readAheadCache) { readAheadCache.clear(); } + if (null != readAheadWorker) { + unregisterListener(readAheadWorker); + } if (null != handleCache) { handleCache.clear(); } - // No-op - zooKeeperClient.getWatcherManager().unregisterChildWatcher( - logMetadata.getLogSegmentsPath(), - BKLogReadHandler.this, - true); + // unregister the log segment listener + metadataStore.unregisterLogSegmentListener(logMetadata.getLogSegmentsPath(), BKLogReadHandler.this); return Future.Void(); } }); @@ -326,6 +356,52 @@ class BKLogReadHandler extends BKLogHandler { return asyncClose(); } + /** + * Start fetch the log segments and register the {@link LogSegmentNamesListener}. + * The future is satisfied only on a successful fetch or encountered a fatal failure. + * + * @return future represents the fetch result + */ + Future<Versioned<List<LogSegmentMetadata>>> asyncStartFetchLogSegments() { + Promise<Versioned<List<LogSegmentMetadata>>> promise = + new Promise<Versioned<List<LogSegmentMetadata>>>(); + asyncStartFetchLogSegments(promise); + return promise; + } + + void asyncStartFetchLogSegments(final Promise<Versioned<List<LogSegmentMetadata>>> promise) { + readLogSegmentsFromStore( + LogSegmentMetadata.COMPARATOR, + LogSegmentFilter.DEFAULT_FILTER, + this).addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + @Override + public void onFailure(Throwable cause) { + if (cause instanceof LogNotFoundException || + cause instanceof LogSegmentNotFoundException || + cause instanceof UnexpectedException) { + // indicate some inconsistent behavior, abort + metadataException.compareAndSet(null, (IOException) cause); + // notify the reader that read handler is in error state + notifyReaderOnError(cause); + FutureUtils.setException(promise, cause); + return; + } + scheduler.schedule(new Runnable() { + @Override + public void run() { + asyncStartFetchLogSegments(promise); + } + }, conf.getZKRetryBackoffMaxMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) { + // no-op + FutureUtils.setValue(promise, segments); + } + }); + } + public void startReadAhead(LedgerReadPosition startPosition, AsyncFailureInjector failureInjector) { if (null == readAheadWorker) { @@ -334,7 +410,6 @@ class BKLogReadHandler extends BKLogHandler { dynConf, logMetadataForReader, this, - zooKeeperClient, readAheadExecutor, handleCache, startPosition, @@ -345,8 +420,16 @@ class BKLogReadHandler extends BKLogHandler { perLogStatsLogger, alertStatsLogger, failureInjector, - notification); - readAheadWorker.start(); + readerStateNotification); + registerListener(readAheadWorker); + // start the readahead worker after the log segments are fetched + asyncStartFetchLogSegments().map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() { + @Override + public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) { + readAheadWorker.start(logSegments.getValue()); + return BoxedUnit.UNIT; + } + }); } } @@ -407,10 +490,110 @@ class BKLogReadHandler extends BKLogHandler { } @VisibleForTesting - void disableReadAheadZKNotification() { - if (null != readAheadWorker) { - readAheadWorker.disableZKNotification(); + void disableReadAheadLogSegmentsNotification() { + logSegmentsNotificationDisabled = true; + } + + @Override + public void onSegmentsUpdated(final Versioned<List<String>> segments) { + synchronized (this) { + if (lastNotifiedLogSegments.getVersion() != Version.NEW && + lastNotifiedLogSegments.getVersion().compare(segments.getVersion()) != Version.Occurred.BEFORE) { + // the log segments has been read, and it is possibly a retry from last segments update + return; + } + } + + Promise<Versioned<List<LogSegmentMetadata>>> readLogSegmentsPromise = + new Promise<Versioned<List<LogSegmentMetadata>>>(); + readLogSegmentsPromise.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + @Override + public void onFailure(Throwable cause) { + if (cause instanceof LogNotFoundException || + cause instanceof LogSegmentNotFoundException || + cause instanceof UnexpectedException) { + // indicate some inconsistent behavior, abort + metadataException.compareAndSet(null, (IOException) cause); + // notify the reader that read handler is in error state + notifyReaderOnError(cause); + return; + } + scheduler.schedule(new Runnable() { + @Override + public void run() { + onSegmentsUpdated(segments); + } + }, conf.getZKRetryBackoffMaxMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public void onSuccess(Versioned<List<LogSegmentMetadata>> logSegments) { + List<LogSegmentMetadata> segmentsToNotify = null; + synchronized (BKLogReadHandler.this) { + Versioned<List<LogSegmentMetadata>> lastLogSegments = lastNotifiedLogSegments; + if (lastLogSegments.getVersion() == Version.NEW || + lastLogSegments.getVersion().compare(logSegments.getVersion()) == Version.Occurred.BEFORE) { + lastNotifiedLogSegments = logSegments; + segmentsToNotify = logSegments.getValue(); + } + } + if (null != segmentsToNotify) { + notifyUpdatedLogSegments(segmentsToNotify); + } + } + }); + // log segments list is updated, read their metadata + readLogSegmentsFromStore( + segments, + LogSegmentMetadata.COMPARATOR, + LogSegmentFilter.DEFAULT_FILTER, + readLogSegmentsPromise); + } + + @Override + public void onLogStreamDeleted() { + notifyLogStreamDeleted(); + } + + // + // Listener for log segments + // + + protected void registerListener(LogSegmentListener listener) { + listeners.add(listener); + } + + protected void unregisterListener(LogSegmentListener listener) { + listeners.remove(listener); + } + + protected void notifyUpdatedLogSegments(List<LogSegmentMetadata> segments) { + if (logSegmentsNotificationDisabled) { + return; + } + + for (LogSegmentListener listener : listeners) { + List<LogSegmentMetadata> listToReturn = + new ArrayList<LogSegmentMetadata>(segments); + Collections.sort(listToReturn, LogSegmentMetadata.COMPARATOR); + listener.onSegmentsUpdated(listToReturn); + } + } + + protected void notifyLogStreamDeleted() { + if (logSegmentsNotificationDisabled) { + return; + } + + for (LogSegmentListener listener : listeners) { + listener.onLogStreamDeleted(); } } + // notify the errors + protected void notifyReaderOnError(Throwable cause) { + if (null != readerStateNotification) { + readerStateNotification.notifyOnError(cause); + } + } }
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java index 4665ed5..5d3be7d 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java @@ -33,6 +33,8 @@ import com.twitter.distributedlog.function.GetLastTxIdFunction; import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter; import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter; import com.twitter.distributedlog.lock.DistributedLock; +import com.twitter.distributedlog.logsegment.LogSegmentFilter; +import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache; import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; import com.twitter.distributedlog.logsegment.RollingPolicy; import com.twitter.distributedlog.logsegment.SizeBasedRollingPolicy; @@ -63,6 +65,7 @@ import org.apache.bookkeeper.stats.AlertStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.versioning.Version; +import org.apache.bookkeeper.versioning.Versioned; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Op; @@ -76,6 +79,7 @@ import scala.runtime.AbstractFunction1; import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -99,9 +103,6 @@ class BKLogWriteHandler extends BKLogHandler { static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class); protected final DistributedLock lock; - protected final int ensembleSize; - protected final int writeQuorumSize; - protected final int ackQuorumSize; protected final LedgerAllocator ledgerAllocator; protected final MaxTxId maxTxId; protected final MaxLogSegmentSequenceNo maxLogSegmentSequenceNo; @@ -117,6 +118,10 @@ class BKLogWriteHandler extends BKLogHandler { // tracking the inprogress log segments protected final LinkedList<Long> inprogressLSSNs; + // Fetch LogSegments State: write can continue without full list of log segments while truncation needs + private final Future<Versioned<List<LogSegmentMetadata>>> fetchForWrite; + private Future<Versioned<List<LogSegmentMetadata>>> fetchForTruncation; + // Recover Functions private final RecoverLogSegmentFunction recoverLogSegmentFunction = new RecoverLogSegmentFunction(); @@ -162,6 +167,7 @@ class BKLogWriteHandler extends BKLogHandler { ZooKeeperClientBuilder zkcBuilder, BookKeeperClientBuilder bkcBuilder, LogSegmentMetadataStore metadataStore, + LogSegmentMetadataCache metadataCache, OrderedScheduler scheduler, LedgerAllocator allocator, StatsLogger statsLogger, @@ -173,8 +179,16 @@ class BKLogWriteHandler extends BKLogHandler { FeatureProvider featureProvider, DynamicDistributedLogConfiguration dynConf, DistributedLock lock /** owned by handler **/) { - super(logMetadata, conf, zkcBuilder, bkcBuilder, metadataStore, - scheduler, statsLogger, alertStatsLogger, null, WRITE_HANDLE_FILTER, clientId); + super(logMetadata, + conf, + zkcBuilder, + bkcBuilder, + metadataStore, + metadataCache, + scheduler, + statsLogger, + alertStatsLogger, + clientId); this.perLogStatsLogger = perLogStatsLogger; this.writeLimiter = writeLimiter; this.featureProvider = featureProvider; @@ -183,23 +197,6 @@ class BKLogWriteHandler extends BKLogHandler { this.lock = lock; this.metadataUpdater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore); - ensembleSize = conf.getEnsembleSize(); - - if (ensembleSize < conf.getWriteQuorumSize()) { - writeQuorumSize = ensembleSize; - LOG.warn("Setting write quorum size {} greater than ensemble size {}", - conf.getWriteQuorumSize(), ensembleSize); - } else { - writeQuorumSize = conf.getWriteQuorumSize(); - } - if (writeQuorumSize < conf.getAckQuorumSize()) { - ackQuorumSize = writeQuorumSize; - LOG.warn("Setting write ack quorum size {} greater than write quorum size {}", - conf.getAckQuorumSize(), writeQuorumSize); - } else { - ackQuorumSize = conf.getAckQuorumSize(); - } - if (conf.getEncodeRegionIDInLogSegmentMetadata()) { this.regionId = regionId; } else { @@ -215,9 +212,12 @@ class BKLogWriteHandler extends BKLogHandler { maxTxId = new MaxTxId(zooKeeperClient, logMetadata.getMaxTxIdPath(), conf.getSanityCheckTxnID(), logMetadata.getMaxTxIdData()); - // Schedule fetching ledgers list in background before we access it. - // We don't need to watch the ledgers list changes for writer, as it manages ledgers list. - scheduleGetLedgersTask(false, true); + // Schedule fetching log segment list in background before we access it. + // We don't need to watch the log segment list changes for writer, as it manages log segment list. + fetchForWrite = readLogSegmentsFromStore( + LogSegmentMetadata.COMPARATOR, + WRITE_HANDLE_FILTER, + null); // Initialize other parameters. setLastLedgerRollingTimeMillis(Utils.nowInMillis()); @@ -237,6 +237,59 @@ class BKLogWriteHandler extends BKLogHandler { deleteOpStats = segmentsStatsLogger.getOpStatsLogger("delete"); } + private Future<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFetch( + final Comparator<LogSegmentMetadata> comparator) { + final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>(); + fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + @Override + public void onFailure(Throwable cause) { + FutureUtils.setException(promise, cause); + } + + @Override + public void onSuccess(Versioned<List<LogSegmentMetadata>> result) { + try { + FutureUtils.setValue(promise, getCachedLogSegments(comparator)); + } catch (UnexpectedException e) { + FutureUtils.setException(promise, e); + } + } + }); + return promise; + } + + private Future<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFullFetch( + final Comparator<LogSegmentMetadata> comparator) { + Future<Versioned<List<LogSegmentMetadata>>> result; + synchronized (this) { + if (null == fetchForTruncation) { + fetchForTruncation = readLogSegmentsFromStore( + LogSegmentMetadata.COMPARATOR, + LogSegmentFilter.DEFAULT_FILTER, + null); + } + result = fetchForTruncation; + } + + final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>(); + result.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + @Override + public void onFailure(Throwable cause) { + FutureUtils.setException(promise, cause); + } + + @Override + public void onSuccess(Versioned<List<LogSegmentMetadata>> result) { + try { + FutureUtils.setValue(promise, getCachedLogSegments(comparator)); + } catch (UnexpectedException e) { + FutureUtils.setException(promise, e); + } + } + }); + return promise; + } + // Transactional operations for MaxLogSegmentSequenceNo void storeMaxSequenceNumber(final Transaction txn, final MaxLogSegmentSequenceNo maxSeqNo, @@ -413,7 +466,7 @@ class BKLogWriteHandler extends BKLogHandler { boolean logSegmentsFound = false; if (LogSegmentMetadata.supportsLogSegmentSequenceNo(conf.getDLLedgerMetadataLayoutVersion())) { - List<LogSegmentMetadata> ledgerListDesc = getFilteredLedgerListDesc(false, false); + List<LogSegmentMetadata> ledgerListDesc = getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR); Long nextLogSegmentSeqNo = DLUtils.nextLogSegmentSequenceNumber(ledgerListDesc); if (null == nextLogSegmentSeqNo) { @@ -452,17 +505,27 @@ class BKLogWriteHandler extends BKLogHandler { return FutureUtils.result(asyncStartLogSegment(txId, bestEffort, allowMaxTxID)); } - protected Future<BKLogSegmentWriter> asyncStartLogSegment(long txId, - boolean bestEffort, - boolean allowMaxTxID) { - Promise<BKLogSegmentWriter> promise = new Promise<BKLogSegmentWriter>(); + protected Future<BKLogSegmentWriter> asyncStartLogSegment(final long txId, + final boolean bestEffort, + final boolean allowMaxTxID) { + final Promise<BKLogSegmentWriter> promise = new Promise<BKLogSegmentWriter>(); try { lock.checkOwnershipAndReacquire(); } catch (LockingException e) { FutureUtils.setException(promise, e); return promise; } - doStartLogSegment(txId, bestEffort, allowMaxTxID, promise); + fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + @Override + public void onFailure(Throwable cause) { + FutureUtils.setException(promise, cause); + } + + @Override + public void onSuccess(Versioned<List<LogSegmentMetadata>> list) { + doStartLogSegment(txId, bestEffort, allowMaxTxID, promise); + } + }); return promise; } @@ -732,7 +795,8 @@ class BKLogWriteHandler extends BKLogHandler { // we only record sequence id when both write version and logsegment's version support sequence id if (LogSegmentMetadata.supportsSequenceId(conf.getDLLedgerMetadataLayoutVersion()) && segment.supportsSequenceId()) { - List<LogSegmentMetadata> logSegmentDescList = getFilteredLedgerListDesc(false, false); + List<LogSegmentMetadata> logSegmentDescList = + getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR); startSequenceId = DLUtils.computeStartSequenceId(logSegmentDescList, segment); } @@ -776,14 +840,46 @@ class BKLogWriteHandler extends BKLogHandler { } protected void doCompleteAndCloseLogSegment(final String inprogressZnodeName, - long logSegmentSeqNo, - long ledgerId, - long firstTxId, - long lastTxId, - int recordCount, - long lastEntryId, - long lastSlotId, + final long logSegmentSeqNo, + final long ledgerId, + final long firstTxId, + final long lastTxId, + final int recordCount, + final long lastEntryId, + final long lastSlotId, final Promise<LogSegmentMetadata> promise) { + fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + @Override + public void onFailure(Throwable cause) { + FutureUtils.setException(promise, cause); + } + + @Override + public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) { + doCompleteAndCloseLogSegmentAfterLogSegmentListFetched( + inprogressZnodeName, + logSegmentSeqNo, + ledgerId, + firstTxId, + lastTxId, + recordCount, + lastEntryId, + lastSlotId, + promise); + } + }); + } + + private void doCompleteAndCloseLogSegmentAfterLogSegmentListFetched( + final String inprogressZnodeName, + long logSegmentSeqNo, + long ledgerId, + long firstTxId, + long lastTxId, + int recordCount, + long lastEntryId, + long lastSlotId, + final Promise<LogSegmentMetadata> promise) { try { lock.checkOwnershipAndReacquire(); } catch (IOException ioe) { @@ -912,7 +1008,7 @@ class BKLogWriteHandler extends BKLogHandler { } catch (IOException ioe) { return Future.exception(ioe); } - return asyncGetFilteredLedgerList(false, false).flatMap(recoverLogSegmentsFunction); + return getCachedLogSegmentsAfterFirstFetch(LogSegmentMetadata.COMPARATOR).flatMap(recoverLogSegmentsFunction); } class RecoverLogSegmentFunction extends Function<LogSegmentMetadata, Future<LogSegmentMetadata>> { @@ -1002,8 +1098,7 @@ class BKLogWriteHandler extends BKLogHandler { List<LogSegmentMetadata> emptyList = new ArrayList<LogSegmentMetadata>(0); return Future.value(emptyList); } - scheduleGetAllLedgersTaskIfNeeded(); - return asyncGetFullLedgerList(false, false).flatMap( + return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap( new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() { @Override public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) { @@ -1068,7 +1163,7 @@ class BKLogWriteHandler extends BKLogHandler { return Future.exception(new IllegalArgumentException( "Invalid timestamp " + minTimestampToKeep + " to purge logs for " + getFullyQualifiedName())); } - return asyncGetFullLedgerList(false, false).flatMap( + return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap( new Function<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() { @Override public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) { @@ -1097,7 +1192,7 @@ class BKLogWriteHandler extends BKLogHandler { } Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTxnId(final long minTxIdToKeep) { - return asyncGetFullLedgerList(true, false).flatMap( + return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap( new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() { @Override public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) { @@ -1260,16 +1355,7 @@ class BKLogWriteHandler extends BKLogHandler { return Utils.closeSequence(scheduler, lock, ledgerAllocator - ).flatMap(new AbstractFunction1<Void, Future<Void>>() { - @Override - public Future<Void> apply(Void result) { - zooKeeperClient.getWatcherManager().unregisterChildWatcher( - logMetadata.getLogSegmentsPath(), - BKLogWriteHandler.this, - false); - return Future.Void(); - } - }); + ); } @Override http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java index ac670c2..28e69b2 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java @@ -238,8 +238,8 @@ class BKSyncLogReaderDLSN implements LogReader, Runnable, FutureEventListener<Lo // Test Methods // @VisibleForTesting - void disableReadAheadZKNotification() { - reader.bkLedgerManager.disableReadAheadZKNotification(); + void disableReadAheadLogSegmentsNotification() { + reader.bkLedgerManager.disableReadAheadLogSegmentsNotification(); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java index 5d0e59a..1f5427c 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java @@ -212,6 +212,14 @@ public class DistributedLogConfiguration extends CompositeConfiguration { public static final String BKDL_UNPARTITIONED_STREAM_NAME = "unpartitionedStreamName"; public static final String BKDL_UNPARTITIONED_STREAM_NAME_DEFAULT = "<default>"; + // Log Segment Cache Parameters + public static final String BKDL_LOGSEGMENT_CACHE_TTL_MS = "logSegmentCacheTTLMs"; + public static final long BKDL_LOGSEGMENT_CACHE_TTL_MS_DEFAULT = 600000; // 10 mins + public static final String BKDL_LOGSEGMENT_CACHE_MAX_SIZE = "logSegmentCacheMaxSize"; + public static final long BKDL_LOGSEGMENT_CACHE_MAX_SIZE_DEFAULT = 10000; + public static final String BKDL_LOGSEGMENT_CACHE_ENABLED = "logSegmentCacheEnabled"; + public static final boolean BKDL_LOGSEGMENT_CACHE_ENABLED_DEFAULT = true; + // // DL Writer Settings // @@ -1644,6 +1652,69 @@ public class DistributedLogConfiguration extends CompositeConfiguration { } // + // LogSegment Cache Settings + // + + /** + * Get the log segment cache entry TTL in milliseconds. + * + * @return log segment cache ttl in milliseconds. + */ + public long getLogSegmentCacheTTLMs() { + return getLong(BKDL_LOGSEGMENT_CACHE_TTL_MS, BKDL_LOGSEGMENT_CACHE_MAX_SIZE_DEFAULT); + } + + /** + * Set the log segment cache entry TTL in milliseconds. + * + * @param ttlMs TTL in milliseconds + * @return distributedlog configuration + */ + public DistributedLogConfiguration setLogSegmentCacheTTLMs(long ttlMs) { + setProperty(BKDL_LOGSEGMENT_CACHE_TTL_MS, ttlMs); + return this; + } + + /** + * Get the maximum size of the log segment cache. + * + * @return maximum size of the log segment cache. + */ + public long getLogSegmentCacheMaxSize() { + return getLong(BKDL_LOGSEGMENT_CACHE_MAX_SIZE, BKDL_LOGSEGMENT_CACHE_MAX_SIZE_DEFAULT); + } + + /** + * Set the maximum size of the log segment cache. + * + * @param maxSize maximum size of the log segment cache. + * @return distributedlog configuration + */ + public DistributedLogConfiguration setLogSegmentCacheMaxSize(long maxSize) { + setProperty(BKDL_LOGSEGMENT_CACHE_MAX_SIZE, maxSize); + return this; + } + + /** + * Is log segment cache enabled? + * + * @return true if log segment cache is enabled; otherwise false + */ + public boolean isLogSegmentCacheEnabled() { + return getBoolean(BKDL_LOGSEGMENT_CACHE_ENABLED, BKDL_LOGSEGMENT_CACHE_ENABLED_DEFAULT); + } + + /** + * Enable/disable log segment cache. + * + * @return distributedlog configuration + */ + public DistributedLogConfiguration setLogSegmentCacheEnabled(boolean enabled) { + setProperty(BKDL_LOGSEGMENT_CACHE_ENABLED, enabled); + return this; + } + + // // DL Writer General Settings // http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java index 994b141..2297579 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java @@ -23,6 +23,8 @@ import java.util.Comparator; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; +import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException; +import com.twitter.distributedlog.exceptions.ZKException; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.Utils; import com.twitter.util.Future; @@ -600,12 +602,18 @@ public class LogSegmentMetadata { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if (KeeperException.Code.OK.intValue() != rc) { - result.setException(KeeperException.create(KeeperException.Code.get(rc))); + if (KeeperException.Code.NONODE.intValue() == rc) { + FutureUtils.setException(result, new LogSegmentNotFoundException(path)); + } else { + FutureUtils.setException(result, + new ZKException("Failed to read log segment metadata from " + path, + KeeperException.Code.get(rc))); + } return; } try { LogSegmentMetadata metadata = parseData(path, data, skipMinVersionCheck); - result.setValue(metadata); + FutureUtils.setValue(result, metadata); } catch (IOException ie) { LOG.error("Error on parsing log segment metadata from {} : ", path, ie); result.setException(ie); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentListener.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentListener.java index bedd4dc..2196245 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentListener.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentListener.java @@ -22,7 +22,7 @@ import com.twitter.distributedlog.LogSegmentMetadata; import java.util.List; /** - * Listener on log segments changes for a given stream. + * Listener on log segments changes for a given stream used by {@link com.twitter.distributedlog.BKLogReadHandler} */ public interface LogSegmentListener { @@ -34,4 +34,9 @@ public interface LogSegmentListener { * updated list of segments. */ void onSegmentsUpdated(List<LogSegmentMetadata> segments); + + /** + * Notified when the log stream is deleted. + */ + void onLogStreamDeleted(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentNamesListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentNamesListener.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentNamesListener.java index 3e89431..e38f305 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentNamesListener.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentNamesListener.java @@ -17,10 +17,13 @@ */ package com.twitter.distributedlog.callback; +import org.apache.bookkeeper.versioning.Versioned; + import java.util.List; /** - * Listener on list of log segments changes for a given stream. + * Listener on list of log segments changes for a given stream used by + * {@link com.twitter.distributedlog.logsegment.LogSegmentMetadataStore}. */ public interface LogSegmentNamesListener { /** @@ -30,5 +33,10 @@ public interface LogSegmentNamesListener { * @param segments * updated list of segments. */ - void onSegmentsUpdated(List<String> segments); + void onSegmentsUpdated(Versioned<List<String>> segments); + + /** + * Notified when the log stream is deleted. + */ + void onLogStreamDeleted(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/function/CloseAsyncCloseableFunction.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/CloseAsyncCloseableFunction.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/function/CloseAsyncCloseableFunction.java new file mode 100644 index 0000000..698a088 --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/function/CloseAsyncCloseableFunction.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.function; + +import com.twitter.distributedlog.io.AsyncCloseable; +import scala.Function0; +import scala.runtime.AbstractFunction0; +import scala.runtime.BoxedUnit; + +/** + * Function to close {@link com.twitter.distributedlog.io.AsyncCloseable} + */ +public class CloseAsyncCloseableFunction extends AbstractFunction0<BoxedUnit> { + + /** + * Return a function to close an {@link AsyncCloseable}. + * + * @param closeable closeable to close + * @return function to close an {@link AsyncCloseable} + */ + public static Function0<BoxedUnit> of(AsyncCloseable closeable) { + return new CloseAsyncCloseableFunction(closeable); + } + + private final AsyncCloseable closeable; + + private CloseAsyncCloseableFunction(AsyncCloseable closeable) { + this.closeable = closeable; + } + + @Override + public BoxedUnit apply() { + closeable.asyncClose(); + return BoxedUnit.UNIT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/function/GetVersionedValueFunction.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/GetVersionedValueFunction.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/function/GetVersionedValueFunction.java new file mode 100644 index 0000000..4e7844c --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/function/GetVersionedValueFunction.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.function; + +import com.twitter.distributedlog.LogSegmentMetadata; +import org.apache.bookkeeper.versioning.Versioned; +import scala.Function1; +import scala.runtime.AbstractFunction1; + +import java.util.List; + +/** + * Function to get the versioned value from {@link org.apache.bookkeeper.versioning.Versioned} + */ +public class GetVersionedValueFunction<T> extends AbstractFunction1<Versioned<T>, T> { + + public static final Function1<Versioned<List<LogSegmentMetadata>>, List<LogSegmentMetadata>> + GET_LOGSEGMENT_LIST_FUNC = new GetVersionedValueFunction<List<LogSegmentMetadata>>(); + + @Override + public T apply(Versioned<T> versionedValue) { + return versionedValue.getValue(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java index cb53b23..f0d2797 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java @@ -22,6 +22,8 @@ import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.LogSegmentMetadata; import com.twitter.distributedlog.ZooKeeperClient; import com.twitter.distributedlog.callback.LogSegmentNamesListener; +import com.twitter.distributedlog.exceptions.LogNotFoundException; +import com.twitter.distributedlog.exceptions.ZKException; import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; import com.twitter.distributedlog.util.DLUtils; import com.twitter.distributedlog.util.FutureUtils; @@ -88,30 +90,21 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch public void onSuccess(final Versioned<List<String>> segments) { // reset the back off after a successful operation currentZKBackOffMs = store.minZKBackoffMs; - final Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet = - store.listeners.get(logSegmentsPath); - if (null != listenerSet) { - store.submitTask(logSegmentsPath, new Runnable() { - @Override - public void run() { - for (VersionedLogSegmentNamesListener listener : listenerSet.values()) { - listener.onSegmentsUpdated(segments); - } - } - }); - } + store.notifyLogSegmentsUpdated( + logSegmentsPath, + store.listeners.get(logSegmentsPath), + segments); } @Override public void onFailure(Throwable cause) { - int backoffMs = store.minZKBackoffMs; - if ((cause instanceof KeeperException)) { - KeeperException ke = (KeeperException) cause; - if (KeeperException.Code.NONODE == ke.code()) { - // the log segment has been deleted, remove all the registered listeners - store.listeners.remove(logSegmentsPath); - return; - } + int backoffMs; + if (cause instanceof LogNotFoundException) { + // the log segment has been deleted, remove all the registered listeners + store.notifyLogStreamDeleted(logSegmentsPath, + store.listeners.remove(logSegmentsPath)); + return; + } else { backoffMs = currentZKBackOffMs; currentZKBackOffMs = Math.min(2 * currentZKBackOffMs, store.maxZKBackoffMs); } @@ -121,7 +114,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch @Override public void run() { if (null != store.listeners.get(logSegmentsPath)) { - store.getLogSegmentNames(logSegmentsPath, store).addEventListener(this); + store.zkGetLogSegmentNames(logSegmentsPath, store).addEventListener(this); } else { logger.debug("Log segments listener for {} has been removed.", logSegmentsPath); } @@ -146,7 +139,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch if (lastNotifiedLogSegments.getVersion() == Version.NEW || lastNotifiedLogSegments.getVersion().compare(logSegments.getVersion()) == Version.Occurred.BEFORE) { lastNotifiedLogSegments = logSegments; - listener.onSegmentsUpdated(logSegments.getValue()); + listener.onSegmentsUpdated(logSegments); } } @@ -309,7 +302,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch } switch (event.getType()) { case NodeDeleted: - listeners.remove(path); + notifyLogStreamDeleted(path, listeners.remove(path)); break; case NodeChildrenChanged: new ReadLogSegmentsTask(path, this).run(); @@ -324,17 +317,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch return LogSegmentMetadata.read(zkc, logSegmentPath, skipMinVersionCheck); } - @Override - public Future<List<String>> getLogSegmentNames(String logSegmentsPath) { - return getLogSegmentNames(logSegmentsPath, null).map(new AbstractFunction1<Versioned<List<String>>, List<String>>() { - @Override - public List<String> apply(Versioned<List<String>> list) { - return list.getValue(); - } - }); - } - - Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath, Watcher watcher) { + Future<Versioned<List<String>>> zkGetLogSegmentNames(String logSegmentsPath, Watcher watcher) { Promise<Versioned<List<String>>> result = new Promise<Versioned<List<String>>>(); try { zkc.get().getChildren(logSegmentsPath, watcher, this, result); @@ -354,46 +337,59 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch /** cversion: the number of changes to the children of this znode **/ ZkVersion zkVersion = new ZkVersion(stat.getCversion()); result.setValue(new Versioned(children, zkVersion)); + } else if (KeeperException.Code.NONODE.intValue() == rc) { + result.setException(new LogNotFoundException("Log " + path + " not found")); } else { - result.setException(KeeperException.create(KeeperException.Code.get(rc))); + result.setException(new ZKException("Failed to get log segments from " + path, + KeeperException.Code.get(rc))); } } @Override - public void registerLogSegmentListener(String logSegmentsPath, - LogSegmentNamesListener listener) { + public Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath, + LogSegmentNamesListener listener) { + Watcher zkWatcher; if (null == listener) { - return; - } - closeLock.readLock().lock(); - try { - if (closed) { - return; - } - Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet = - listeners.get(logSegmentsPath); - if (null == listenerSet) { - Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> newListenerSet = - new HashMap<LogSegmentNamesListener, VersionedLogSegmentNamesListener>(); - Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> oldListenerSet = - listeners.putIfAbsent(logSegmentsPath, newListenerSet); - if (null != oldListenerSet) { - listenerSet = oldListenerSet; + zkWatcher = null; + } else { + closeLock.readLock().lock(); + try { + if (closed) { + zkWatcher = null; } else { - listenerSet = newListenerSet; - } - } - synchronized (listenerSet) { - listenerSet.put(listener, new VersionedLogSegmentNamesListener(listener)); - if (!listeners.containsKey(logSegmentsPath)) { - // listener set has been removed, add it back - listeners.put(logSegmentsPath, listenerSet); + Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet = + listeners.get(logSegmentsPath); + if (null == listenerSet) { + Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> newListenerSet = + new HashMap<LogSegmentNamesListener, VersionedLogSegmentNamesListener>(); + Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> oldListenerSet = + listeners.putIfAbsent(logSegmentsPath, newListenerSet); + if (null != oldListenerSet) { + listenerSet = oldListenerSet; + } else { + listenerSet = newListenerSet; + } + } + synchronized (listenerSet) { + listenerSet.put(listener, new VersionedLogSegmentNamesListener(listener)); + if (!listeners.containsKey(logSegmentsPath)) { + // listener set has been removed, add it back + if (null != listeners.putIfAbsent(logSegmentsPath, listenerSet)) { + logger.debug("Listener set is already found for log segments path {}", logSegmentsPath); + } + } + } + zkWatcher = ZKLogSegmentMetadataStore.this; } + } finally { + closeLock.readLock().unlock(); } - new ReadLogSegmentsTask(logSegmentsPath, this).run(); - } finally { - closeLock.readLock().unlock(); } + Future<Versioned<List<String>>> getLogSegmentNamesResult = zkGetLogSegmentNames(logSegmentsPath, zkWatcher); + if (null != listener) { + getLogSegmentNamesResult.addEventListener(new ReadLogSegmentsTask(logSegmentsPath, this)); + } + return zkGetLogSegmentNames(logSegmentsPath, zkWatcher); } @Override @@ -433,4 +429,38 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch } } + // Notifications + + void notifyLogStreamDeleted(String logSegmentsPath, + final Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listeners) { + if (null == listeners) { + return; + } + this.submitTask(logSegmentsPath, new Runnable() { + @Override + public void run() { + for (LogSegmentNamesListener listener : listeners.keySet()) { + listener.onLogStreamDeleted(); + } + } + }); + + } + + void notifyLogSegmentsUpdated(String logSegmentsPath, + final Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listeners, + final Versioned<List<String>> segments) { + if (null == listeners) { + return; + } + this.submitTask(logSegmentsPath, new Runnable() { + @Override + public void run() { + for (VersionedLogSegmentNamesListener listener : listeners.values()) { + listener.onSegmentsUpdated(segments); + } + } + }); + } + } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentCache.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentCache.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentCache.java deleted file mode 100644 index 9716f95..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentCache.java +++ /dev/null @@ -1,228 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.logsegment; - -import com.google.common.collect.Sets; -import com.twitter.distributedlog.DistributedLogConstants; -import com.twitter.distributedlog.LogSegmentMetadata; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * Managing log segments in local cache. - * - * <p> - * Caching of log segment metadata assumes that the data contained in the ZNodes for individual - * log segments is never updated after creation i.e we never call setData. A log segment - * is finalized by creating a new ZNode and deleting the in progress node. This code will have - * to change if we change the behavior - * </p> - */ -public class LogSegmentCache { - - static final Logger LOG = LoggerFactory.getLogger(LogSegmentCache.class); - - protected final String streamName; - protected final Map<String, LogSegmentMetadata> logSegments = - new HashMap<String, LogSegmentMetadata>(); - protected final ConcurrentMap<Long, LogSegmentMetadata> lid2LogSegments = - new ConcurrentHashMap<Long, LogSegmentMetadata>(); - - public LogSegmentCache(String streamName) { - this.streamName = streamName; - } - - /** - * Retrieve log segments from the cache. - * - * - first sort the log segments in ascending order - * - do validation and assign corresponding sequence id - * - apply comparator after validation - * - * @param comparator - * comparator to sort the returned log segments. - * @return list of sorted and filtered log segments. - * @throws UnexpectedException if unexpected condition detected (e.g. ledger sequence number gap) - */ - public List<LogSegmentMetadata> getLogSegments(Comparator<LogSegmentMetadata> comparator) - throws UnexpectedException { - List<LogSegmentMetadata> segmentsToReturn; - synchronized (logSegments) { - segmentsToReturn = new ArrayList<LogSegmentMetadata>(logSegments.size()); - segmentsToReturn.addAll(logSegments.values()); - } - Collections.sort(segmentsToReturn, LogSegmentMetadata.COMPARATOR); - long startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID; - LogSegmentMetadata prevSegment = null; - for (int i = 0; i < segmentsToReturn.size(); i++) { - LogSegmentMetadata segment = segmentsToReturn.get(i); - - // validation on ledger sequence number - // - we are ok that if there are same log segments exist. it is just same log segment in different - // states (inprogress vs completed). it could happen during completing log segment without transaction - if (null != prevSegment - && prevSegment.getVersion() >= LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value - && segment.getVersion() >= LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value - && prevSegment.getLogSegmentSequenceNumber() != segment.getLogSegmentSequenceNumber() - && prevSegment.getLogSegmentSequenceNumber() + 1 != segment.getLogSegmentSequenceNumber()) { - LOG.error("{} found ledger sequence number gap between log segment {} and {}", - new Object[] { streamName, prevSegment, segment }); - throw new UnexpectedException(streamName + " found ledger sequence number gap between log segment " - + prevSegment.getLogSegmentSequenceNumber() + " and " + segment.getLogSegmentSequenceNumber()); - } - - // assign sequence id - if (!segment.isInProgress()) { - if (segment.supportsSequenceId()) { - startSequenceId = segment.getStartSequenceId() + segment.getRecordCount(); - if (null != prevSegment && prevSegment.supportsSequenceId() - && prevSegment.getStartSequenceId() > segment.getStartSequenceId()) { - LOG.warn("{} found decreasing start sequence id in log segment {}, previous is {}", - new Object[] { streamName, segment, prevSegment }); - } - } else { - startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID; - } - } else { - if (segment.supportsSequenceId()) { - LogSegmentMetadata newSegment = segment.mutator() - .setStartSequenceId(startSequenceId == DistributedLogConstants.UNASSIGNED_SEQUENCE_ID ? 0L : startSequenceId) - .build(); - segmentsToReturn.set(i, newSegment); - } - break; - } - prevSegment = segment; - } - if (comparator != LogSegmentMetadata.COMPARATOR) { - Collections.sort(segmentsToReturn, comparator); - } - return segmentsToReturn; - } - - /** - * Add the segment <i>metadata</i> for <i>name</i> in the cache. - * - * @param name - * segment name. - * @param metadata - * segment metadata. - */ - public void add(String name, LogSegmentMetadata metadata) { - synchronized (logSegments) { - if (!logSegments.containsKey(name)) { - logSegments.put(name, metadata); - LOG.info("{} added log segment ({} : {}) to cache.", - new Object[]{ streamName, name, metadata }); - } - LogSegmentMetadata oldMetadata = lid2LogSegments.remove(metadata.getLedgerId()); - if (null == oldMetadata) { - lid2LogSegments.put(metadata.getLedgerId(), metadata); - } else { - if (oldMetadata.isInProgress() && !metadata.isInProgress()) { - lid2LogSegments.put(metadata.getLedgerId(), metadata); - } else { - lid2LogSegments.put(oldMetadata.getLedgerId(), oldMetadata); - } - } - } - } - - /** - * Retrieve log segment <code>name</code> from the cache. - * - * @param name - * name of the log segment. - * @return log segment metadata - */ - public LogSegmentMetadata get(String name) { - synchronized (logSegments) { - return logSegments.get(name); - } - } - - /** - * Update the log segment cache with removed/added segments. - * - * @param segmentsRemoved - * segments that removed - * @param segmentsAdded - * segments that added - */ - public void update(Set<String> segmentsRemoved, - Map<String, LogSegmentMetadata> segmentsAdded) { - synchronized (logSegments) { - for (Map.Entry<String, LogSegmentMetadata> entry : segmentsAdded.entrySet()) { - add(entry.getKey(), entry.getValue()); - } - for (String segment : segmentsRemoved) { - remove(segment); - } - } - } - - /** - * Diff with new received segment list <code>segmentReceived</code>. - * - * @param segmentsReceived - * new received segment list - * @return segments added (left) and removed (right). - */ - public Pair<Set<String>, Set<String>> diff(Set<String> segmentsReceived) { - Set<String> segmentsAdded; - Set<String> segmentsRemoved; - synchronized (logSegments) { - Set<String> segmentsCached = logSegments.keySet(); - segmentsAdded = Sets.difference(segmentsReceived, segmentsCached).immutableCopy(); - segmentsRemoved = Sets.difference(segmentsCached, segmentsReceived).immutableCopy(); - } - return Pair.of(segmentsAdded, segmentsRemoved); - } - - /** - * Remove log segment <code>name</code> from the cache. - * - * @param name - * name of the log segment. - * @return log segment metadata. - */ - public LogSegmentMetadata remove(String name) { - synchronized (logSegments) { - LogSegmentMetadata metadata = logSegments.remove(name); - if (null != metadata) { - lid2LogSegments.remove(metadata.getLedgerId(), metadata); - LOG.debug("Removed log segment ({} : {}) from cache.", name, metadata); - } - return metadata; - } - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java new file mode 100644 index 0000000..d4ca3ea --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.logsegment; + +import com.google.common.base.Ticker; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.LogSegmentMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * Cache the log segment metadata + */ +public class LogSegmentMetadataCache implements RemovalListener<String, LogSegmentMetadata> { + + private static final Logger logger = LoggerFactory.getLogger(LogSegmentMetadataCache.class); + + private final Cache<String, LogSegmentMetadata> cache; + private final boolean isCacheEnabled; + + public LogSegmentMetadataCache(DistributedLogConfiguration conf, + Ticker ticker) { + cache = CacheBuilder.newBuilder() + .concurrencyLevel(conf.getNumWorkerThreads()) + .initialCapacity(1024) + .expireAfterAccess(conf.getLogSegmentCacheTTLMs(), TimeUnit.MILLISECONDS) + .maximumSize(conf.getLogSegmentCacheMaxSize()) + .removalListener(this) + .ticker(ticker) + .recordStats() + .build(); + this.isCacheEnabled = conf.isLogSegmentCacheEnabled(); + logger.info("Log segment cache is enabled = {}", this.isCacheEnabled); + } + + /** + * Add the log <i>segment</i> of <i>path</i> to the cache. + * + * @param path the path of the log segment + * @param segment log segment metadata + */ + public void put(String path, LogSegmentMetadata segment) { + if (isCacheEnabled) { + cache.put(path, segment); + } + } + + /** + * Invalid the cache entry associated with <i>path</i>. + * + * @param path the path of the log segment + */ + public void invalidate(String path) { + if (isCacheEnabled) { + cache.invalidate(path); + } + } + + /** + * Retrieve the log segment of <i>path</i> from the cache. + * + * @param path the path of the log segment. + * @return log segment metadata if exists, otherwise null. + */ + public LogSegmentMetadata get(String path) { + return cache.getIfPresent(path); + } + + @Override + public void onRemoval(RemovalNotification<String, LogSegmentMetadata> notification) { + if (notification.wasEvicted()) { + if (logger.isDebugEnabled()) { + logger.debug("Log segment of {} was evicted.", notification.getKey()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java index 430e15f..2ea1671 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java @@ -130,24 +130,17 @@ public interface LogSegmentMetadataStore extends Closeable { Future<LogSegmentMetadata> getLogSegment(String logSegmentPath); /** - * Retrieve the list of log segments under <code>logSegmentsPath</code>. + * Retrieve the list of log segments under <code>logSegmentsPath</code> and register a <i>listener</i> + * for subsequent changes for the list of log segments. * * @param logSegmentsPath * path to store list of log segments - * @return future of the retrieved list of log segment names - */ - Future<List<String>> getLogSegmentNames(String logSegmentsPath); - - /** - * Register a log segment <code>listener</code> on log segment changes under <code>logSegmentsPath</code>. - * - * @param logSegmentsPath - * log segments path * @param listener * log segment listener on log segment changes + * @return future of the retrieved list of log segment names */ - void registerLogSegmentListener(String logSegmentsPath, - LogSegmentNamesListener listener); + Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath, + LogSegmentNamesListener listener); /** * Unregister a log segment <code>listener</code> on log segment changes under <code>logSegmentsPath</code>. http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java new file mode 100644 index 0000000..f242941 --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.logsegment; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; +import com.twitter.distributedlog.DistributedLogConstants; +import com.twitter.distributedlog.LogSegmentMetadata; +import com.twitter.distributedlog.exceptions.UnexpectedException; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Managing log segments in local cache. + * + * <p> + * Caching of log segment metadata assumes that the data contained in the ZNodes for individual + * log segments is never updated after creation i.e we never call setData. A log segment + * is finalized by creating a new ZNode and deleting the in progress node. This code will have + * to change if we change the behavior + * </p> + */ +public class PerStreamLogSegmentCache { + + static final Logger LOG = LoggerFactory.getLogger(PerStreamLogSegmentCache.class); + + protected final String streamName; + protected final boolean validateLogSegmentSequenceNumber; + protected final Map<String, LogSegmentMetadata> logSegments = + new HashMap<String, LogSegmentMetadata>(); + protected final ConcurrentMap<Long, LogSegmentMetadata> lid2LogSegments = + new ConcurrentHashMap<Long, LogSegmentMetadata>(); + + @VisibleForTesting + PerStreamLogSegmentCache(String streamName) { + this(streamName, true); + } + + public PerStreamLogSegmentCache(String streamName, + boolean validateLogSegmentSequenceNumber) { + this.streamName = streamName; + this.validateLogSegmentSequenceNumber = validateLogSegmentSequenceNumber; + } + + /** + * Retrieve log segments from the cache. + * + * - first sort the log segments in ascending order + * - do validation and assign corresponding sequence id + * - apply comparator after validation + * + * @param comparator + * comparator to sort the returned log segments. + * @return list of sorted and filtered log segments. + * @throws UnexpectedException if unexpected condition detected (e.g. ledger sequence number gap) + */ + public List<LogSegmentMetadata> getLogSegments(Comparator<LogSegmentMetadata> comparator) + throws UnexpectedException { + List<LogSegmentMetadata> segmentsToReturn; + synchronized (logSegments) { + segmentsToReturn = new ArrayList<LogSegmentMetadata>(logSegments.size()); + segmentsToReturn.addAll(logSegments.values()); + } + Collections.sort(segmentsToReturn, LogSegmentMetadata.COMPARATOR); + + LogSegmentMetadata prevSegment = null; + if (validateLogSegmentSequenceNumber) { + // validation ledger sequence number to ensure the log segments are unique. + for (int i = 0; i < segmentsToReturn.size(); i++) { + LogSegmentMetadata segment = segmentsToReturn.get(i); + + if (null != prevSegment + && prevSegment.getVersion() >= LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value + && segment.getVersion() >= LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value + && prevSegment.getLogSegmentSequenceNumber() + 1 != segment.getLogSegmentSequenceNumber()) { + LOG.error("{} found ledger sequence number gap between log segment {} and {}", + new Object[] { streamName, prevSegment, segment }); + throw new UnexpectedException(streamName + " found ledger sequence number gap between log segment " + + prevSegment.getLogSegmentSequenceNumber() + " and " + segment.getLogSegmentSequenceNumber()); + } + prevSegment = segment; + } + } + + prevSegment = null; + long startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID; + for (int i = 0; i < segmentsToReturn.size(); i++) { + LogSegmentMetadata segment = segmentsToReturn.get(i); + // assign sequence id + if (!segment.isInProgress()) { + if (segment.supportsSequenceId()) { + startSequenceId = segment.getStartSequenceId() + segment.getRecordCount(); + if (null != prevSegment && prevSegment.supportsSequenceId() + && prevSegment.getStartSequenceId() > segment.getStartSequenceId()) { + LOG.warn("{} found decreasing start sequence id in log segment {}, previous is {}", + new Object[] { streamName, segment, prevSegment }); + } + } else { + startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID; + } + } else { + if (segment.supportsSequenceId()) { + LogSegmentMetadata newSegment = segment.mutator() + .setStartSequenceId(startSequenceId == DistributedLogConstants.UNASSIGNED_SEQUENCE_ID ? 0L : startSequenceId) + .build(); + segmentsToReturn.set(i, newSegment); + } + + break; + } + prevSegment = segment; + } + if (comparator != LogSegmentMetadata.COMPARATOR) { + Collections.sort(segmentsToReturn, comparator); + } + return segmentsToReturn; + } + + /** + * Add the segment <i>metadata</i> for <i>name</i> in the cache. + * + * @param name + * segment name. + * @param metadata + * segment metadata. + */ + public void add(String name, LogSegmentMetadata metadata) { + synchronized (logSegments) { + if (!logSegments.containsKey(name)) { + logSegments.put(name, metadata); + LOG.info("{} added log segment ({} : {}) to cache.", + new Object[]{ streamName, name, metadata }); + } + LogSegmentMetadata oldMetadata = lid2LogSegments.remove(metadata.getLedgerId()); + if (null == oldMetadata) { + lid2LogSegments.put(metadata.getLedgerId(), metadata); + } else { + if (oldMetadata.isInProgress() && !metadata.isInProgress()) { + lid2LogSegments.put(metadata.getLedgerId(), metadata); + } else { + lid2LogSegments.put(oldMetadata.getLedgerId(), oldMetadata); + } + } + } + } + + /** + * Retrieve log segment <code>name</code> from the cache. + * + * @param name + * name of the log segment. + * @return log segment metadata + */ + public LogSegmentMetadata get(String name) { + synchronized (logSegments) { + return logSegments.get(name); + } + } + + /** + * Update the log segment cache with removed/added segments. + * + * @param segmentsRemoved + * segments that removed + * @param segmentsAdded + * segments that added + */ + public void update(Set<String> segmentsRemoved, + Map<String, LogSegmentMetadata> segmentsAdded) { + synchronized (logSegments) { + for (Map.Entry<String, LogSegmentMetadata> entry : segmentsAdded.entrySet()) { + add(entry.getKey(), entry.getValue()); + } + for (String segment : segmentsRemoved) { + remove(segment); + } + } + } + + /** + * Diff with new received segment list <code>segmentReceived</code>. + * + * @param segmentsReceived + * new received segment list + * @return segments added (left) and removed (right). + */ + public Pair<Set<String>, Set<String>> diff(Set<String> segmentsReceived) { + Set<String> segmentsAdded; + Set<String> segmentsRemoved; + synchronized (logSegments) { + Set<String> segmentsCached = logSegments.keySet(); + segmentsAdded = Sets.difference(segmentsReceived, segmentsCached).immutableCopy(); + segmentsRemoved = Sets.difference(segmentsCached, segmentsReceived).immutableCopy(); + } + return Pair.of(segmentsAdded, segmentsRemoved); + } + + /** + * Remove log segment <code>name</code> from the cache. + * + * @param name + * name of the log segment. + * @return log segment metadata. + */ + public LogSegmentMetadata remove(String name) { + synchronized (logSegments) { + LogSegmentMetadata metadata = logSegments.remove(name); + if (null != metadata) { + lid2LogSegments.remove(metadata.getLedgerId(), metadata); + LOG.debug("Removed log segment ({} : {}) from cache.", name, metadata); + } + return metadata; + } + } + + +}