http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java index 5c15009..9ba8ca4 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java @@ -24,7 +24,6 @@ import com.twitter.distributedlog.AsyncNotification; import com.twitter.distributedlog.BKLogHandler; import com.twitter.distributedlog.DLSN; import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.DistributedLogConstants; import com.twitter.distributedlog.LedgerDescriptor; import com.twitter.distributedlog.LedgerHandleCache; import com.twitter.distributedlog.LedgerReadPosition; @@ -32,13 +31,16 @@ import com.twitter.distributedlog.exceptions.LogNotFoundException; import com.twitter.distributedlog.exceptions.LogReadException; import com.twitter.distributedlog.LogSegmentMetadata; import com.twitter.distributedlog.ReadAheadCache; -import com.twitter.distributedlog.ZooKeeperClient; +import com.twitter.distributedlog.callback.LogSegmentListener; import com.twitter.distributedlog.callback.ReadAheadCallback; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.exceptions.DLInterruptedException; +import com.twitter.distributedlog.exceptions.UnexpectedException; +import com.twitter.distributedlog.exceptions.ZKException; import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader; import com.twitter.distributedlog.injector.AsyncFailureInjector; import com.twitter.distributedlog.io.AsyncCloseable; +import com.twitter.distributedlog.logsegment.LogSegmentFilter; import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; @@ -55,10 +57,8 @@ import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.MathUtils; +import org.apache.bookkeeper.versioning.Versioned; import org.apache.commons.lang3.tuple.Pair; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Function1; @@ -97,7 +97,7 @@ import java.util.concurrent.atomic.AtomicInteger; * Exceptions Handling Phase: Handle all the exceptions and properly schedule next readahead request. * </p> */ -public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, AsyncCloseable { +public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncCloseable, LogSegmentListener { private static final Logger LOG = LoggerFactory.getLogger(ReadAheadWorker.class); @@ -115,7 +115,6 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As protected final AsyncNotification notification; // resources - private final ZooKeeperClient zkc; protected final OrderedScheduler scheduler; private final LedgerHandleCache handleCache; private final ReadAheadCache readAheadCache; @@ -144,10 +143,6 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As // LogSegments & Metadata Notification // - // variables related to getting log segments from zookeeper - volatile boolean zkNotificationDisabled = false; - private final Watcher getLedgersWatcher; - // variables related to zookeeper watcher notification to interrupt long poll waits final Object notificationLock = new Object(); AsyncNotification metadataNotification = null; @@ -155,11 +150,13 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As // variables related to log segments private volatile boolean reInitializeMetadata = true; + private volatile boolean forceReadLogSegments = false; volatile boolean inProgressChanged = false; private LogSegmentMetadata currentMetadata = null; private int currentMetadataIndex; protected LedgerDescriptor currentLH; - private volatile List<LogSegmentMetadata> ledgerList; + private volatile List<LogSegmentMetadata> logSegmentListNotified; + private volatile List<LogSegmentMetadata> logSegmentList; // // ReadAhead Phases @@ -208,7 +205,6 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As DynamicDistributedLogConfiguration dynConf, ZKLogMetadataForReader logMetadata, BKLogHandler ledgerManager, - ZooKeeperClient zkc, OrderedScheduler scheduler, LedgerHandleCache handleCache, LedgerReadPosition startPosition, @@ -229,7 +225,6 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As this.isHandleForReading = isHandleForReading; this.notification = notification; // Resources - this.zkc = zkc; this.scheduler = scheduler; this.handleCache = handleCache; this.readAheadCache = readAheadCache; @@ -237,8 +232,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As this.startReadPosition = new LedgerReadPosition(startPosition); this.nextReadAheadPosition = new LedgerReadPosition(startPosition); // LogSegments - this.getLedgersWatcher = this.zkc.getWatcherManager() - .registerChildWatcher(logMetadata.getLogSegmentsPath(), this); + // Failure Detection this.failureInjector = failureInjector; // Tracing @@ -283,12 +277,12 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As // ReadAhead Status // - void setReadAheadError(ReadAheadTracker tracker) { + void setReadAheadError(ReadAheadTracker tracker, Throwable cause) { LOG.error("Read Ahead for {} is set to error.", logMetadata.getFullyQualifiedName()); readAheadError = true; tracker.enterPhase(ReadAheadPhase.ERROR); if (null != notification) { - notification.notifyOnError(); + notification.notifyOnError(cause); } if (null != stopPromise) { FutureUtils.setValue(stopPromise, null); @@ -299,7 +293,8 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As readAheadInterrupted = true; tracker.enterPhase(ReadAheadPhase.INTERRUPTED); if (null != notification) { - notification.notifyOnError(); + notification.notifyOnError(new DLInterruptedException("ReadAhead worker for " + + bkLedgerManager.getFullyQualifiedName() + " is interrupted.")); } if (null != stopPromise) { FutureUtils.setValue(stopPromise, null); @@ -310,7 +305,9 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As readingFromTruncated = true; tracker.enterPhase(ReadAheadPhase.TRUNCATED); if (null != notification) { - notification.notifyOnError(); + notification.notifyOnError( + new AlreadyTruncatedTransactionException(logMetadata.getFullyQualifiedName() + + ": Trying to position read ahead to a segment that is marked truncated")); } if (null != stopPromise) { FutureUtils.setValue(stopPromise, null); @@ -347,9 +344,11 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As return !isCatchingUp; } - public void start() { - LOG.debug("Starting ReadAhead Worker for {}", fullyQualifiedName); + public void start(List<LogSegmentMetadata> segmentList) { + LOG.debug("Starting ReadAhead Worker for {} : segments = {}", + fullyQualifiedName, segmentList); running = true; + logSegmentListNotified = segmentList; schedulePhase.process(BKException.Code.OK); } @@ -358,9 +357,6 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As LOG.info("Stopping Readahead worker for {}", fullyQualifiedName); running = false; - this.zkc.getWatcherManager() - .unregisterChildWatcher(this.logMetadata.getLogSegmentsPath(), this, true); - // Aside from unfortunate naming of variables, this allows // the currently active long poll to be interrupted and completed AsyncNotification notification; @@ -417,7 +413,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As } catch (RuntimeException rte) { LOG.error("ReadAhead on stream {} encountered runtime exception", logMetadata.getFullyQualifiedName(), rte); - setReadAheadError(tracker); + setReadAheadError(tracker, rte); throw rte; } } @@ -455,7 +451,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As try { scheduler.submit(addRTEHandler(runnable)); } catch (RejectedExecutionException ree) { - setReadAheadError(tracker); + setReadAheadError(tracker, ree); } } @@ -470,7 +466,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As scheduler.schedule(addRTEHandler(task), timeInMillis, TimeUnit.MILLISECONDS); readAheadWorkerWaits.inc(); } catch (RejectedExecutionException ree) { - setReadAheadError(tracker); + setReadAheadError(tracker, ree); } } @@ -533,12 +529,14 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As LOG.error("{} : BookKeeper Client used by the ReadAhead Thread has encountered {} zookeeper exceptions : simulate = {}", new Object[] { fullyQualifiedName, bkcZkExceptions.get(), injectErrors }); running = false; - setReadAheadError(tracker); + setReadAheadError(tracker, new LogReadException( + "Encountered too many zookeeper issues on read ahead for " + bkLedgerManager.getFullyQualifiedName())); } else if (bkcUnExpectedExceptions.get() > BKC_UNEXPECTED_EXCEPTION_THRESHOLD) { LOG.error("{} : ReadAhead Thread has encountered {} unexpected BK exceptions.", fullyQualifiedName, bkcUnExpectedExceptions.get()); running = false; - setReadAheadError(tracker); + setReadAheadError(tracker, new LogReadException( + "Encountered too many unexpected bookkeeper issues on read ahead for " + bkLedgerManager.getFullyQualifiedName())); } else { // We must always reinitialize metadata if the last attempt to read failed. reInitializeMetadata = true; @@ -629,39 +627,21 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As * Phase on checking in progress changed. */ final class CheckInProgressChangedPhase extends Phase - implements BookkeeperInternalCallbacks.GenericCallback<List<LogSegmentMetadata>> { + implements FutureEventListener<Versioned<List<LogSegmentMetadata>>> { CheckInProgressChangedPhase(Phase next) { super(next); } - @Override - public void operationComplete(final int rc, final List<LogSegmentMetadata> result) { + void processLogSegments(final List<LogSegmentMetadata> segments) { // submit callback execution to dlg executor to avoid deadlock. submit(new Runnable() { @Override public void run() { - if (KeeperException.Code.OK.intValue() != rc) { - if (KeeperException.Code.NONODE.intValue() == rc) { - LOG.info("Log {} has been deleted. Set ReadAhead to error to stop reading.", - logMetadata.getFullyQualifiedName()); - logDeleted = true; - setReadAheadError(tracker); - return; - } - LOG.info("ZK Exception {} while reading ledger list", rc); - reInitializeMetadata = true; - if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) { - handleException(ReadAheadPhase.GET_LEDGERS, BKException.Code.InterruptedException); - } else { - handleException(ReadAheadPhase.GET_LEDGERS, BKException.Code.ZKException); - } - return; - } - ledgerList = result; + logSegmentList = segments; boolean isInitialPositioning = nextReadAheadPosition.definitelyLessThanOrEqualTo(startReadPosition); - for (int i = 0; i < ledgerList.size(); i++) { - LogSegmentMetadata l = ledgerList.get(i); + for (int i = 0; i < logSegmentList.size(); i++) { + LogSegmentMetadata l = logSegmentList.get(i); // By default we should skip truncated segments during initial positioning if (l.isTruncated() && isInitialPositioning && @@ -820,11 +800,34 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As TimeUnit.MILLISECONDS.toMicros(elapsedMillisSinceMetadataChanged)); metadataNotificationTimeMillis = -1L; } - bkLedgerManager.asyncGetLedgerList(LogSegmentMetadata.COMPARATOR, getLedgersWatcher, this); + if (forceReadLogSegments) { + forceReadLogSegments = false; + bkLedgerManager.readLogSegmentsFromStore( + LogSegmentMetadata.COMPARATOR, + LogSegmentFilter.DEFAULT_FILTER, + null + ).addEventListener(this); + } else { + processLogSegments(logSegmentListNotified); + } } else { next.process(BKException.Code.OK); } } + + @Override + public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) { + processLogSegments(segments.getValue()); + } + + @Override + public void onFailure(Throwable cause) { + LOG.info("Encountered metadata exception while reading log segments of {} : {}. Retrying ...", + bkLedgerManager.getFullyQualifiedName(), cause.getMessage()); + reInitializeMetadata = true; + forceReadLogSegments = true; + handleException(ReadAheadPhase.GET_LEDGERS, BKException.Code.ZKException); + } } final class OpenLedgerPhase extends Phase @@ -922,6 +925,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As LOG.info("{} Ledger {} for inprogress segment {} closed for idle reader warn threshold", new Object[] { fullyQualifiedName, currentMetadata, currentLH }); reInitializeMetadata = true; + forceReadLogSegments = true; } } else { lastLedgerCloseDetected.reset().start(); @@ -986,7 +990,9 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As } if (conf.getPositionGapDetectionEnabled() && gapDetected) { - setReadAheadError(tracker); + setReadAheadError(tracker, new UnexpectedException( + "Unexpected last entry id during read ahead : " + currentMetadata + + ", lac = " + lastAddConfirmed)); } else { // This disconnect will only surface during repositioning and // will not silently miss records; therefore its safe to not halt @@ -1003,14 +1009,16 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As } LogSegmentMetadata oldMetadata = currentMetadata; currentMetadata = null; - if (currentMetadataIndex + 1 < ledgerList.size()) { - currentMetadata = ledgerList.get(++currentMetadataIndex); + if (currentMetadataIndex + 1 < logSegmentList.size()) { + currentMetadata = logSegmentList.get(++currentMetadataIndex); if (currentMetadata.getLogSegmentSequenceNumber() != (oldMetadata.getLogSegmentSequenceNumber() + 1)) { // We should never get here as we should have exited the loop if // pendingRequests were empty alertStatsLogger.raise("Unexpected condition during read ahead; {} , {}", currentMetadata, oldMetadata); - setReadAheadError(tracker); + setReadAheadError(tracker, new UnexpectedException( + "Unexpected condition during read ahead : current metadata " + + currentMetadata + ", old metadata " + oldMetadata)); } else { if (currentMetadata.isTruncated()) { if (conf.getAlertWhenPositioningOnTruncated()) { @@ -1345,37 +1353,26 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As } @Override - public void process(WatchedEvent event) { - if (zkNotificationDisabled) { - return; + public void onSegmentsUpdated(List<LogSegmentMetadata> segments) { + AsyncNotification notification; + synchronized (notificationLock) { + logSegmentListNotified = segments; + reInitializeMetadata = true; + LOG.debug("{} Read ahead node changed", fullyQualifiedName); + notification = metadataNotification; + metadataNotification = null; } - - if ((event.getType() == Watcher.Event.EventType.None) - && (event.getState() == Watcher.Event.KeeperState.SyncConnected)) { - LOG.debug("Reconnected ..."); - } else if (((event.getType() == Event.EventType.None) && (event.getState() == Event.KeeperState.Expired)) || - ((event.getType() == Event.EventType.NodeChildrenChanged))) { - AsyncNotification notification; - synchronized (notificationLock) { - reInitializeMetadata = true; - LOG.debug("{} Read ahead node changed", fullyQualifiedName); - notification = metadataNotification; - metadataNotification = null; - } - metadataNotificationTimeMillis = System.currentTimeMillis(); - if (null != notification) { - notification.notifyOnOperationComplete(); - } - } else if (event.getType() == Event.EventType.NodeDeleted) { - logDeleted = true; - setReadAheadError(tracker); + metadataNotificationTimeMillis = System.currentTimeMillis(); + if (null != notification) { + notification.notifyOnOperationComplete(); } } - @VisibleForTesting - public void disableZKNotification() { - LOG.info("{} ZK Notification was disabled", fullyQualifiedName); - zkNotificationDisabled = true; + @Override + public void onLogStreamDeleted() { + logDeleted = true; + setReadAheadError(tracker, new LogNotFoundException("Log stream " + + bkLedgerManager.getFullyQualifiedName() + " is deleted.")); } /** @@ -1424,7 +1421,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As } @Override - public void notifyOnError() { + public void notifyOnError(Throwable t) { longPollInterruptionStat.registerFailedEvent(MathUtils.elapsedMicroSec(startNanos)); execute(); } @@ -1477,7 +1474,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As abstract void doComplete(boolean success); @Override - public void notifyOnError() { + public void notifyOnError(Throwable cause) { longPollInterruptionStat.registerFailedEvent(MathUtils.elapsedMicroSec(startNanos)); complete(false); }
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java index 6a5f7a7..6a647a9 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java @@ -343,9 +343,10 @@ public class FutureUtils { */ public static Throwable zkException(Throwable throwable, String path) { if (throwable instanceof KeeperException) { - return throwable; + return new ZKException("Encountered zookeeper exception on " + path, (KeeperException) throwable); } else if (throwable instanceof ZooKeeperClient.ZooKeeperConnectionException) { - return KeeperException.create(KeeperException.Code.CONNECTIONLOSS, path); + return new ZKException("Encountered zookeeper connection loss on " + path, + KeeperException.Code.CONNECTIONLOSS); } else if (throwable instanceof InterruptedException) { return new DLInterruptedException("Interrupted on operating " + path, throwable); } else { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java index 57b576e..c588cd7 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java @@ -18,6 +18,7 @@ package com.twitter.distributedlog; import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter; +import com.twitter.distributedlog.logsegment.LogSegmentFilter; import com.twitter.distributedlog.metadata.BKDLConfig; import com.twitter.distributedlog.metadata.DLMetadata; import com.twitter.distributedlog.namespace.DistributedLogNamespace; @@ -185,7 +186,12 @@ public class DLMTestUtil { BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(name, conf, uri); try { BKLogReadHandler readHandler = dlm.createReadHandler(); - List<LogSegmentMetadata> ledgerList = readHandler.getFullLedgerList(true, true); + List<LogSegmentMetadata> ledgerList = FutureUtils.result( + readHandler.readLogSegmentsFromStore( + LogSegmentMetadata.COMPARATOR, + LogSegmentFilter.DEFAULT_FILTER, + null) + ).getValue(); LogSegmentMetadata lastSegment = ledgerList.get(ledgerList.size() - 1); BookKeeperClient bkc = dlm.getWriterBKC(); LedgerHandle lh = bkc.get().openLedger(lastSegment.getLedgerId(), @@ -415,10 +421,12 @@ public class DLMTestUtil { conf.getAckQuorumSize(), BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes()); String inprogressZnodeName = writeHandler.inprogressZNodeName(lh.getId(), startTxID, logSegmentSeqNo); String znodePath = writeHandler.inprogressZNode(lh.getId(), startTxID, logSegmentSeqNo); + int logSegmentMetadataVersion = conf.getDLLedgerMetadataLayoutVersion(); LogSegmentMetadata l = new LogSegmentMetadata.LogSegmentMetadataBuilder(znodePath, - conf.getDLLedgerMetadataLayoutVersion(), lh.getId(), startTxID) + logSegmentMetadataVersion, lh.getId(), startTxID) .setLogSegmentSequenceNo(logSegmentSeqNo) + .setEnvelopeEntries(LogSegmentMetadata.supportsEnvelopedEntries(logSegmentMetadataVersion)) .build(); l.write(dlm.writerZKC); writeHandler.maxTxId.store(startTxID); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java index ae71a1e..fb69c8d 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java @@ -65,7 +65,7 @@ class NonBlockingReadsTestUtil { try { LOG.info("Created reader reading from {}", dlm.getStreamName()); if (forceStall) { - reader.disableReadAheadZKNotification(); + reader.disableReadAheadLogSegmentsNotification(); } long numTrans = 0; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java index 38aaa5b..0c7f346 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java @@ -86,6 +86,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { this.testConf = new DistributedLogConfiguration(); this.testConf.loadConf(conf); this.testConf.setReaderIdleErrorThresholdMillis(1200000); + this.testConf.setReadAheadWaitTimeOnEndOfStream(20); } @Rule @@ -816,7 +817,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { @Ignore @Test(timeout = 120000) public void testSimpleAsyncReadWriteStartEmptyFactory() throws Exception { - int count = 50; + // int count = 50; + int count = 1; String name = runtime.getMethodName(); DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); confLocal.loadConf(testConf); @@ -1484,9 +1486,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { } @DistributedLogAnnotations.FlakyTest - @Ignore @Test(timeout = 60000) - public void testAsyncReadMissingZKNotification() throws Exception { + public void testAsyncReadMissingLogSegmentsNotification() throws Exception { String name = "distrlog-async-reader-missing-zk-notification"; DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); confLocal.loadConf(testConf); @@ -1501,6 +1502,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { final int segmentSize = 10; final int numSegments = 3; final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch readLatch = new CountDownLatch(1); final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); executor.schedule( new Runnable() { @@ -1515,6 +1517,9 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { writer.write(DLMTestUtil.getLargeLogRecordInstance(txid++)); if ((i == 0) && (j == 1)) { latch.countDown(); + } else { + // wait for reader to start + readLatch.await(); } } writer.closeAndComplete(); @@ -1530,13 +1535,16 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { latch.await(); BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN)dlm.getAsyncLogReader(DLSN.InitialDLSN); - reader.disableReadAheadZKNotification(); + reader.disableReadAheadLogSegmentsNotification(); boolean exceptionEncountered = false; int recordCount = 0; try { while (true) { Future<LogRecordWithDLSN> record = reader.readNext(); Await.result(record); + if (recordCount == 0) { + readLatch.countDown(); + } recordCount++; if (recordCount >= segmentSize * numSegments) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java index c0789ec..6ad9950 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogManager.java @@ -979,13 +979,18 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { return; } if (updates >= 1) { - if (segments.get(0).getLogSegmentSequenceNumber() != updates) { + if (segments.get(segments.size() - 1).getLogSegmentSequenceNumber() != updates) { numFailures.incrementAndGet(); } } receivedStreams.set(segments); latches[updates].countDown(); } + + @Override + public void onLogStreamDeleted() { + // no-op + } }); LOG.info("Registered listener for stream {}.", name); long txid = 1; @@ -1006,12 +1011,12 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { assertEquals(0, numFailures.get()); assertNotNull(receivedStreams.get()); assertEquals(numSegments, receivedStreams.get().size()); - int seqno = numSegments; + int seqno = 1; for (LogSegmentMetadata m : receivedStreams.get()) { assertEquals(seqno, m.getLogSegmentSequenceNumber()); assertEquals((seqno - 1) * DEFAULT_SEGMENT_SIZE + 1, m.getFirstTxId()); assertEquals(seqno * DEFAULT_SEGMENT_SIZE, m.getLastTxId()); - --seqno; + ++seqno; } } @@ -1122,6 +1127,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { confLocal.loadConf(conf); confLocal.setDLLedgerMetadataLayoutVersion(LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION); confLocal.setOutputBufferSize(0); + confLocal.setLogSegmentCacheEnabled(false); LogSegmentMetadataStore metadataStore = new ZKLogSegmentMetadataStore(confLocal, zookeeperClient, scheduler); @@ -1174,7 +1180,8 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { { LogReader reader = dlm.getInputStream(DLSN.InitialDLSN); LogRecordWithDLSN record = reader.readNext(false); - assertTrue((record != null) && (record.getDlsn().compareTo(new DLSN(2, 0, 0)) == 0)); + assertTrue("Unexpected record : " + record, + (record != null) && (record.getDlsn().compareTo(new DLSN(2, 0, 0)) == 0)); reader.close(); } @@ -1225,7 +1232,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase { BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); Assert.assertTrue(Await.result(writer.truncate(truncDLSN))); BKLogWriteHandler handler = writer.getCachedWriteHandler(); - List<LogSegmentMetadata> cachedSegments = handler.getFullLedgerList(false, false); + List<LogSegmentMetadata> cachedSegments = handler.getCachedLogSegments(LogSegmentMetadata.COMPARATOR); for (LogSegmentMetadata segment: cachedSegments) { if (segment.getLastDLSN().compareTo(truncDLSN) < 0) { Assert.assertTrue(segment.isTruncated()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java index 48c07ed..4b17500 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogReadHandler.java @@ -20,6 +20,7 @@ package com.twitter.distributedlog; import com.google.common.base.Optional; import com.twitter.distributedlog.exceptions.LogNotFoundException; import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException; +import com.twitter.distributedlog.logsegment.LogSegmentFilter; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.Utils; import com.twitter.util.Duration; @@ -28,12 +29,9 @@ import com.twitter.util.Await; import java.util.List; import java.util.ArrayList; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import com.twitter.util.TimeoutException; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -89,76 +87,6 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { } @Test(timeout = 60000) - public void testGetLedgerList() throws Exception { - String dlName = runtime.getMethodName(); - prepareLogSegments(dlName, 3, 3); - BKDistributedLogManager dlm = createNewDLM(conf, dlName); - BKLogReadHandler readHandler = dlm.createReadHandler(); - List<LogSegmentMetadata> ledgerList = readHandler.getLedgerList(false, false, LogSegmentMetadata.COMPARATOR, false); - List<LogSegmentMetadata> ledgerList2 = readHandler.getFilteredLedgerList(true, false); - List<LogSegmentMetadata> ledgerList3 = readHandler.getLedgerList(false, false, LogSegmentMetadata.COMPARATOR, false); - assertEquals(3, ledgerList.size()); - assertEquals(3, ledgerList2.size()); - assertEquals(3, ledgerList3.size()); - for (int i=0; i<3; i++) { - assertEquals(ledgerList3.get(i), ledgerList2.get(i)); - } - } - - @Test(timeout = 60000) - public void testForceGetLedgerList() throws Exception { - String dlName = runtime.getMethodName(); - prepareLogSegments(dlName, 3, 3); - BKDistributedLogManager dlm = createNewDLM(conf, dlName); - BKLogReadHandler readHandler = dlm.createReadHandler(); - List<LogSegmentMetadata> ledgerList = readHandler.getLedgerList(true, false, LogSegmentMetadata.COMPARATOR, false); - final AtomicReference<List<LogSegmentMetadata>> resultHolder = - new AtomicReference<List<LogSegmentMetadata>>(null); - final CountDownLatch latch = new CountDownLatch(1); - readHandler.asyncGetLedgerList(LogSegmentMetadata.COMPARATOR, null, new BookkeeperInternalCallbacks.GenericCallback<List<LogSegmentMetadata>>() { - @Override - public void operationComplete(int rc, List<LogSegmentMetadata> result) { - resultHolder.set(result); - latch.countDown(); - } - }); - latch.await(); - List<LogSegmentMetadata> newLedgerList = resultHolder.get(); - assertNotNull(newLedgerList); - LOG.info("Force sync get list : {}", ledgerList); - LOG.info("Async get list : {}", newLedgerList); - assertEquals(3, ledgerList.size()); - assertEquals(3, newLedgerList.size()); - for (int i=0; i<3; i++) { - assertEquals(ledgerList.get(i), newLedgerList.get(i)); - } - } - - @Test(timeout = 60000) - public void testGetFilteredLedgerListInWriteHandler() throws Exception { - String dlName = runtime.getMethodName(); - prepareLogSegments(dlName, 11, 3); - BKDistributedLogManager dlm = createNewDLM(conf, dlName); - - // Get full list. - BKLogWriteHandler writeHandler0 = dlm.createWriteHandler(false); - List<LogSegmentMetadata> cachedFullLedgerList = - writeHandler0.getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR); - assertTrue(cachedFullLedgerList.size() <= 1); - List<LogSegmentMetadata> fullLedgerList = writeHandler0.getFullLedgerListDesc(false, false); - assertEquals(11, fullLedgerList.size()); - - // Get filtered list. - BKLogWriteHandler writeHandler1 = dlm.createWriteHandler(false); - List<LogSegmentMetadata> filteredLedgerListDesc = writeHandler1.getFilteredLedgerListDesc(false, false); - assertEquals(1, filteredLedgerListDesc.size()); - assertEquals(fullLedgerList.get(0), filteredLedgerListDesc.get(0)); - List<LogSegmentMetadata> filteredLedgerList = writeHandler1.getFilteredLedgerList(false, false); - assertEquals(1, filteredLedgerList.size()); - assertEquals(fullLedgerList.get(0), filteredLedgerList.get(0)); - } - - @Test(timeout = 60000) public void testGetFirstDLSNWithOpenLedger() throws Exception { String dlName = runtime.getMethodName(); @@ -362,7 +290,13 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); BKLogReadHandler readHandler = bkdlm.createReadHandler(); - List<LogSegmentMetadata> ledgerList = readHandler.getLedgerList(false, false, LogSegmentMetadata.COMPARATOR, false); + List<LogSegmentMetadata> ledgerList = FutureUtils.result( + readHandler.readLogSegmentsFromStore( + LogSegmentMetadata.COMPARATOR, + LogSegmentFilter.DEFAULT_FILTER, + null + ) + ).getValue(); assertEquals(1, ledgerList.size()); assertTrue(ledgerList.get(0).isInProgress()); @@ -386,7 +320,12 @@ public class TestBKLogReadHandler extends TestDistributedLogBase { Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false))); BKLogReadHandler readHandler = bkdlm.createReadHandler(); - List<LogSegmentMetadata> ledgerList = readHandler.getLedgerList(false, false, LogSegmentMetadata.COMPARATOR, false); + List<LogSegmentMetadata> ledgerList = FutureUtils.result( + readHandler.readLogSegmentsFromStore( + LogSegmentMetadata.COMPARATOR, + LogSegmentFilter.DEFAULT_FILTER, + null) + ).getValue(); assertEquals(2, ledgerList.size()); assertFalse(ledgerList.get(0).isInProgress()); assertTrue(ledgerList.get(1).isInProgress()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAhead.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAhead.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAhead.java index d28e2c6..71b6834 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAhead.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAhead.java @@ -139,14 +139,6 @@ public class TestReadAhead extends TestDistributedLogBase { } Thread.sleep(1000); - // Expire the session, so the readahead should be awaken from backoff - ZooKeeperClientUtils.expireSession(reader.bkLedgerManager.zooKeeperClient, zkServers, 1000); - AsyncNotification notification2; - do { - Thread.sleep(200); - notification2 = reader.bkLedgerManager.readAheadWorker.getMetadataNotification(); - } while (null == notification2 || notification1 == notification2); - // write another record BKSyncLogWriter writer = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java index e014311..998c7ba 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadUtils.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Optional; import com.google.common.collect.Lists; +import com.twitter.distributedlog.logsegment.LogSegmentFilter; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.Utils; import com.twitter.util.Await; @@ -91,7 +92,12 @@ public class TestReadUtils extends TestDistributedLogBase { private Future<LogRecordWithDLSN> getLastUserRecord(BKDistributedLogManager bkdlm, int ledgerNo) throws Exception { BKLogReadHandler readHandler = bkdlm.createReadHandler(); - List<LogSegmentMetadata> ledgerList = readHandler.getLedgerList(false, false, LogSegmentMetadata.COMPARATOR, false); + List<LogSegmentMetadata> ledgerList = FutureUtils.result( + readHandler.readLogSegmentsFromStore( + LogSegmentMetadata.COMPARATOR, + LogSegmentFilter.DEFAULT_FILTER, + null) + ).getValue(); final LedgerHandleCache handleCache = LedgerHandleCache.newBuilder() .bkc(bkdlm.getWriterBKC()) .conf(conf) http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java index 555b1b0..a7dead4 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java @@ -99,6 +99,7 @@ public class TestDLCK extends TestDistributedLogBase { confLocal.setImmediateFlushEnabled(true); confLocal.setOutputBufferSize(0); confLocal.setLogSegmentSequenceNumberValidationEnabled(false); + confLocal.setLogSegmentCacheEnabled(false); URI uri = createDLMURI("/check-and-repair-dl-namespace"); zkc.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); com.twitter.distributedlog.DistributedLogManagerFactory factory = http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java index 6e6b83a..66d7228 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDistributedLogAdmin.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.TestZooKeeperClientBuilder; import com.twitter.distributedlog.annotations.DistributedLogAnnotations; +import com.twitter.distributedlog.exceptions.UnexpectedException; import com.twitter.distributedlog.util.Utils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; @@ -46,7 +47,6 @@ import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater; import com.twitter.util.Await; import com.twitter.util.Duration; import com.twitter.util.Future; -import com.twitter.util.TimeoutException; import static org.junit.Assert.*; @@ -81,11 +81,19 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase { DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); confLocal.addConfiguration(conf); confLocal.setLogSegmentSequenceNumberValidationEnabled(false); + confLocal.setLogSegmentCacheEnabled(false); + + DistributedLogConfiguration readConf = new DistributedLogConfiguration(); + readConf.addConfiguration(conf); + readConf.setLogSegmentCacheEnabled(false); + readConf.setLogSegmentSequenceNumberValidationEnabled(true); URI uri = createDLMURI("/change-sequence-number"); zooKeeperClient.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); com.twitter.distributedlog.DistributedLogManagerFactory factory = new com.twitter.distributedlog.DistributedLogManagerFactory(confLocal, uri); + com.twitter.distributedlog.DistributedLogManagerFactory readFactory = + new com.twitter.distributedlog.DistributedLogManagerFactory(readConf, uri); String streamName = "change-sequence-number"; @@ -96,61 +104,64 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase { dlm.close(); // create a reader - DistributedLogManager readDLM = factory.createDistributedLogManagerWithSharedClients(streamName); + DistributedLogManager readDLM = readFactory.createDistributedLogManagerWithSharedClients(streamName); AsyncLogReader reader = readDLM.getAsyncLogReader(DLSN.InitialDLSN); // read the records long expectedTxId = 1L; + DLSN lastDLSN = DLSN.InitialDLSN; for (int i = 0; i < 4 * 10; i++) { - LogRecord record = Await.result(reader.readNext()); + LogRecordWithDLSN record = Await.result(reader.readNext()); assertNotNull(record); DLMTestUtil.verifyLogRecord(record); assertEquals(expectedTxId, record.getTransactionId()); expectedTxId++; + lastDLSN = record.getDlsn(); } + LOG.info("Injecting bad log segment '3'"); + dlm = factory.createDistributedLogManagerWithSharedClients(streamName); DLMTestUtil.injectLogSegmentWithGivenLogSegmentSeqNo(dlm, confLocal, 3L, 5 * 10 + 1, true, 10, false); - // Wait for reader to be aware of new log segments - TimeUnit.SECONDS.sleep(2); + LOG.info("Injected bad log segment '3'"); - DLSN dlsn = readDLM.getLastDLSN(); - assertTrue(dlsn.compareTo(new DLSN(5, Long.MIN_VALUE, Long.MIN_VALUE)) < 0); - assertTrue(dlsn.compareTo(new DLSN(4, -1, Long.MIN_VALUE)) > 0); // there isn't records should be read Future<LogRecordWithDLSN> readFuture = reader.readNext(); try { - Await.result(readFuture, Duration.fromMilliseconds(1000)); - fail("Should fail reading next when there is a corrupted log segment"); - } catch (TimeoutException te) { + LogRecordWithDLSN record = Await.result(readFuture); + fail("Should fail reading next record " + + record + + " when there is a corrupted log segment"); + } catch (UnexpectedException ue) { // expected } + LOG.info("Dryrun fix inprogress segment that has lower sequence number"); + // Dryrun DistributedLogAdmin.fixInprogressSegmentWithLowerSequenceNumber(factory, new DryrunLogSegmentMetadataStoreUpdater(confLocal, getLogSegmentMetadataStore(factory)), streamName, false, false); - // Wait for reader to be aware of new log segments - TimeUnit.SECONDS.sleep(2); - - dlsn = readDLM.getLastDLSN(); - assertTrue(dlsn.compareTo(new DLSN(5, Long.MIN_VALUE, Long.MIN_VALUE)) < 0); - assertTrue(dlsn.compareTo(new DLSN(4, -1, Long.MIN_VALUE)) > 0); - // there isn't records should be read try { - Await.result(readFuture, Duration.fromMilliseconds(1000)); + reader = readDLM.getAsyncLogReader(lastDLSN); + Await.result(reader.readNext()); fail("Should fail reading next when there is a corrupted log segment"); - } catch (TimeoutException te) { + } catch (UnexpectedException ue) { // expected } + LOG.info("Actual run fix inprogress segment that has lower sequence number"); + // Actual run DistributedLogAdmin.fixInprogressSegmentWithLowerSequenceNumber(factory, LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, getLogSegmentMetadataStore(factory)), streamName, false, false); - // Wait for reader to be aware of new log segments - TimeUnit.SECONDS.sleep(2); + // be able to read more after fix + reader = readDLM.getAsyncLogReader(lastDLSN); + // skip the first record + Await.result(reader.readNext()); + readFuture = reader.readNext(); expectedTxId = 51L; LogRecord record = Await.result(readFuture); @@ -167,15 +178,11 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase { expectedTxId++; } - dlsn = readDLM.getLastDLSN(); - LOG.info("LastDLSN after fix inprogress segment : {}", dlsn); - assertTrue(dlsn.compareTo(new DLSN(7, Long.MIN_VALUE, Long.MIN_VALUE)) < 0); - assertTrue(dlsn.compareTo(new DLSN(6, -1, Long.MIN_VALUE)) > 0); - Utils.close(reader); readDLM.close(); dlm.close(); factory.close(); + readFactory.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java index f8fd3eb..d874274 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java @@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory; import java.net.URI; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -311,7 +312,8 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Collections.sort(children); assertEquals("Should find 10 log segments", 10, children.size()); - List<String> logSegmentNames = FutureUtils.result(lsmStore.getLogSegmentNames(rootPath)); + List<String> logSegmentNames = + FutureUtils.result(lsmStore.getLogSegmentNames(rootPath, null)).getValue(); Collections.sort(logSegmentNames); assertEquals("Should find 10 log segments", 10, logSegmentNames.size()); @@ -331,9 +333,13 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { public void testRegisterListenerAfterLSMStoreClosed() throws Exception { lsmStore.close(); LogSegmentMetadata segment = createLogSegment(1L); - lsmStore.registerLogSegmentListener(segment.getZkPath(), new LogSegmentNamesListener() { + lsmStore.getLogSegmentNames(segment.getZkPath(), new LogSegmentNamesListener() { @Override - public void onSegmentsUpdated(List<String> segments) { + public void onSegmentsUpdated(Versioned<List<String>> segments) { + // no-op; + } + @Override + public void onLogStreamDeleted() { // no-op; } }); @@ -358,13 +364,17 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { final List<List<String>> segmentLists = Lists.newArrayListWithExpectedSize(2); LogSegmentNamesListener listener = new LogSegmentNamesListener() { @Override - public void onSegmentsUpdated(List<String> segments) { + public void onSegmentsUpdated(Versioned<List<String>> segments) { logger.info("Received segments : {}", segments); - segmentLists.add(segments); + segmentLists.add(segments.getValue()); numNotifications.incrementAndGet(); } + @Override + public void onLogStreamDeleted() { + // no-op; + } }; - lsmStore.registerLogSegmentListener(rootPath, listener); + lsmStore.getLogSegmentNames(rootPath, listener); assertEquals(1, lsmStore.listeners.size()); assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath)); assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener)); @@ -420,13 +430,18 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { final List<List<String>> segmentLists = Lists.newArrayListWithExpectedSize(2); LogSegmentNamesListener listener = new LogSegmentNamesListener() { @Override - public void onSegmentsUpdated(List<String> segments) { + public void onSegmentsUpdated(Versioned<List<String>> segments) { logger.info("Received segments : {}", segments); - segmentLists.add(segments); + segmentLists.add(segments.getValue()); numNotifications.incrementAndGet(); } + + @Override + public void onLogStreamDeleted() { + // no-op; + } }; - lsmStore.registerLogSegmentListener(rootPath, listener); + lsmStore.getLogSegmentNames(rootPath, listener); assertEquals(1, lsmStore.listeners.size()); assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath)); assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener)); @@ -487,13 +502,18 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { final List<List<String>> segmentLists = Lists.newArrayListWithExpectedSize(2); LogSegmentNamesListener listener = new LogSegmentNamesListener() { @Override - public void onSegmentsUpdated(List<String> segments) { + public void onSegmentsUpdated(Versioned<List<String>> segments) { logger.info("Received segments : {}", segments); - segmentLists.add(segments); + segmentLists.add(segments.getValue()); numNotifications.incrementAndGet(); } + + @Override + public void onLogStreamDeleted() { + // no-op; + } }; - lsmStore.registerLogSegmentListener(rootPath, listener); + lsmStore.getLogSegmentNames(rootPath, listener); assertEquals(1, lsmStore.listeners.size()); assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath)); assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener)); @@ -536,6 +556,80 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { } @Test(timeout = 60000) + public void testLogSegmentNamesListenerOnDeletingLogStream() throws Exception { + int numSegments = 3; + Transaction<Object> createTxn = lsmStore.transaction(); + for (int i = 0; i < numSegments; i++) { + LogSegmentMetadata segment = createLogSegment(i); + lsmStore.createLogSegment(createTxn, segment); + } + FutureUtils.result(createTxn.execute()); + String rootPath = "/" + runtime.getMethodName(); + List<String> children = zkc.get().getChildren(rootPath, false); + Collections.sort(children); + + final AtomicInteger numNotifications = new AtomicInteger(0); + final List<List<String>> segmentLists = Lists.newArrayListWithExpectedSize(2); + final CountDownLatch deleteLatch = new CountDownLatch(1); + LogSegmentNamesListener listener = new LogSegmentNamesListener() { + @Override + public void onSegmentsUpdated(Versioned<List<String>> segments) { + logger.info("Received segments : {}", segments); + segmentLists.add(segments.getValue()); + numNotifications.incrementAndGet(); + } + + @Override + public void onLogStreamDeleted() { + deleteLatch.countDown(); + } + }; + lsmStore.getLogSegmentNames(rootPath, listener); + assertEquals(1, lsmStore.listeners.size()); + assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath)); + assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener)); + while (numNotifications.get() < 1) { + TimeUnit.MILLISECONDS.sleep(10); + } + assertEquals("Should receive one segment list update", + 1, numNotifications.get()); + List<String> firstSegmentList = segmentLists.get(0); + Collections.sort(firstSegmentList); + assertEquals("List of segments should be same", + children, firstSegmentList); + + // delete all log segments, it should trigger segment list updated + Transaction<Object> deleteTxn = lsmStore.transaction(); + for (int i = 0; i < numSegments; i++) { + LogSegmentMetadata segment = createLogSegment(i); + lsmStore.deleteLogSegment(deleteTxn, segment); + } + FutureUtils.result(deleteTxn.execute()); + List<String> newChildren = zkc.get().getChildren(rootPath, false); + Collections.sort(newChildren); + while (numNotifications.get() < 2) { + TimeUnit.MILLISECONDS.sleep(10); + } + assertEquals("Should receive second segment list update", + 2, numNotifications.get()); + List<String> secondSegmentList = segmentLists.get(1); + Collections.sort(secondSegmentList); + assertEquals("List of segments should be updated", + 0, secondSegmentList.size()); + assertEquals("List of segments should be updated", + newChildren, secondSegmentList); + + // delete the root path + zkc.get().delete(rootPath, -1); + while (!lsmStore.listeners.isEmpty()) { + TimeUnit.MILLISECONDS.sleep(10); + } + assertTrue("listener should be removed after root path is deleted", + lsmStore.listeners.isEmpty()); + deleteLatch.await(); + } + + @Test(timeout = 60000) public void testStoreMaxLogSegmentSequenceNumber() throws Exception { Transaction<Object> updateTxn = lsmStore.transaction(); Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0)); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/logsegment/TestLogSegmentCache.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/logsegment/TestLogSegmentCache.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/logsegment/TestLogSegmentCache.java deleted file mode 100644 index 3282f5c..0000000 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/logsegment/TestLogSegmentCache.java +++ /dev/null @@ -1,220 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.logsegment; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.twitter.distributedlog.DLMTestUtil; -import com.twitter.distributedlog.LogSegmentMetadata; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import org.apache.commons.lang3.tuple.Pair; -import org.junit.Test; - -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.junit.Assert.*; - -/** - * Test Case for Log Segment Cache. - */ -public class TestLogSegmentCache { - - @Test(timeout = 60000) - public void testBasicOperations() { - LogSegmentMetadata metadata = - DLMTestUtil.completedLogSegment("/segment1", 1L, 1L, 100L, 100, 1L, 99L, 0L); - String name = DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L); - - LogSegmentCache cache = new LogSegmentCache("test-basic-operations"); - assertNull("No log segment " + name + " should be cached", cache.get(name)); - cache.add(name, metadata); - LogSegmentMetadata metadataRetrieved = cache.get(name); - assertNotNull("log segment " + name + " should be cached", metadataRetrieved); - assertEquals("Wrong log segment metadata returned for " + name, - metadata, metadataRetrieved); - LogSegmentMetadata metadataRemoved = cache.remove(name); - assertNull("log segment " + name + " should be removed from cache", cache.get(name)); - assertEquals("Wrong log segment metadata removed for " + name, - metadata, metadataRemoved); - assertNull("No log segment " + name + " to be removed", cache.remove(name)); - } - - @Test(timeout = 60000) - public void testDiff() { - LogSegmentCache cache = new LogSegmentCache("test-diff"); - // add 5 completed log segments - for (int i = 1; i <= 5; i++) { - LogSegmentMetadata metadata = - DLMTestUtil.completedLogSegment("/segment" + i, i, i, i * 100L, 100, i, 99L, 0L); - String name = DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i); - cache.add(name, metadata); - } - // add one inprogress log segment - LogSegmentMetadata inprogress = - DLMTestUtil.inprogressLogSegment("/inprogress-6", 6, 600L, 6); - String name = DLMTestUtil.inprogressZNodeName(6); - cache.add(name, inprogress); - - // deleted first 2 completed log segments and completed the last one - Set<String> segmentRemoved = Sets.newHashSet(); - for (int i = 1; i <= 2; i++) { - segmentRemoved.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i)); - } - segmentRemoved.add((DLMTestUtil.inprogressZNodeName(6))); - Set<String> segmentReceived = Sets.newHashSet(); - Set<String> segmentAdded = Sets.newHashSet(); - for (int i = 3; i <= 6; i++) { - segmentReceived.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i)); - if (i == 6) { - segmentAdded.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i)); - } - } - - Pair<Set<String>, Set<String>> segmentChanges = cache.diff(segmentReceived); - assertTrue("Should remove " + segmentRemoved + ", but removed " + segmentChanges.getRight(), - Sets.difference(segmentRemoved, segmentChanges.getRight()).isEmpty()); - assertTrue("Should add " + segmentAdded + ", but added " + segmentChanges.getLeft(), - Sets.difference(segmentAdded, segmentChanges.getLeft()).isEmpty()); - } - - @Test(timeout = 60000) - public void testUpdate() { - LogSegmentCache cache = new LogSegmentCache("test-update"); - // add 5 completed log segments - for (int i = 1; i <= 5; i++) { - LogSegmentMetadata metadata = - DLMTestUtil.completedLogSegment("/segment" + i, i, i, i * 100L, 100, i, 99L, 0L); - String name = DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i); - cache.add(name, metadata); - } - // add one inprogress log segment - LogSegmentMetadata inprogress = - DLMTestUtil.inprogressLogSegment("/inprogress-6", 6, 600L, 6); - String name = DLMTestUtil.inprogressZNodeName(6); - cache.add(name, inprogress); - - // deleted first 2 completed log segments and completed the last one - Set<String> segmentRemoved = Sets.newHashSet(); - for (int i = 1; i <= 2; i++) { - segmentRemoved.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i)); - } - segmentRemoved.add((DLMTestUtil.inprogressZNodeName(6))); - Set<String> segmentReceived = Sets.newHashSet(); - Map<String, LogSegmentMetadata> segmentAdded = Maps.newHashMap(); - for (int i = 3; i <= 6; i++) { - segmentReceived.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i)); - if (i == 6) { - segmentAdded.put(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i), - DLMTestUtil.completedLogSegment("/segment" + i, i, i, i * 100L, 100, i, 99L, 0L)); - } - } - - // update the cache - cache.update(segmentRemoved, segmentAdded); - for (String segment : segmentRemoved) { - assertNull("Segment " + segment + " should be removed.", cache.get(segment)); - } - for (String segment : segmentReceived) { - assertNotNull("Segment " + segment + " should not be removed", cache.get(segment)); - } - for (Map.Entry<String, LogSegmentMetadata> entry : segmentAdded.entrySet()) { - assertEquals("Segment " + entry.getKey() + " should be added.", - entry.getValue(), entry.getValue()); - } - } - - @Test(timeout = 60000, expected = UnexpectedException.class) - public void testGapDetection() throws Exception { - LogSegmentCache cache = new LogSegmentCache("test-gap-detection"); - cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L), - DLMTestUtil.completedLogSegment("/segment-1", 1L, 1L, 100L, 100, 1L, 99L, 0L)); - cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(3L), - DLMTestUtil.completedLogSegment("/segment-3", 3L, 3L, 300L, 100, 3L, 99L, 0L)); - cache.getLogSegments(LogSegmentMetadata.COMPARATOR); - } - - @Test(timeout = 60000) - public void testGapDetectionOnLogSegmentsWithoutLogSegmentSequenceNumber() throws Exception { - LogSegmentCache cache = new LogSegmentCache("test-gap-detection"); - LogSegmentMetadata segment1 = - DLMTestUtil.completedLogSegment("/segment-1", 1L, 1L, 100L, 100, 1L, 99L, 0L) - .mutator().setVersion(LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V1_ORIGINAL).build(); - cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L), segment1); - LogSegmentMetadata segment3 = - DLMTestUtil.completedLogSegment("/segment-3", 3L, 3L, 300L, 100, 3L, 99L, 0L) - .mutator().setVersion(LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO).build(); - cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(3L), segment3); - List<LogSegmentMetadata> expectedList = Lists.asList(segment1, new LogSegmentMetadata[] { segment3 }); - List<LogSegmentMetadata> resultList = cache.getLogSegments(LogSegmentMetadata.COMPARATOR); - assertEquals(expectedList, resultList); - } - - @Test(timeout = 60000) - public void testSameLogSegment() throws Exception { - LogSegmentCache cache = new LogSegmentCache("test-same-log-segment"); - List<LogSegmentMetadata> expectedList = Lists.newArrayListWithExpectedSize(2); - LogSegmentMetadata inprogress = - DLMTestUtil.inprogressLogSegment("/inprogress-1", 1L, 1L, 1L); - expectedList.add(inprogress); - cache.add(DLMTestUtil.inprogressZNodeName(1L), inprogress); - LogSegmentMetadata completed = - DLMTestUtil.completedLogSegment("/segment-1", 1L, 1L, 100L, 100, 1L, 99L, 0L); - expectedList.add(completed); - cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L), completed); - - List<LogSegmentMetadata> retrievedList = cache.getLogSegments(LogSegmentMetadata.COMPARATOR); - assertEquals("Should get both log segments in ascending order", - expectedList.size(), retrievedList.size()); - for (int i = 0; i < expectedList.size(); i++) { - assertEqualsWithoutSequenceId(expectedList.get(i), retrievedList.get(i)); - } - assertEquals("inprogress log segment should see start sequence id : 0", - 0L, retrievedList.get(0).getStartSequenceId()); - Collections.reverse(expectedList); - retrievedList = cache.getLogSegments(LogSegmentMetadata.DESC_COMPARATOR); - assertEquals("Should get both log segments in descending order", - expectedList.size(), retrievedList.size()); - for (int i = 0; i < expectedList.size(); i++) { - assertEqualsWithoutSequenceId(expectedList.get(i), retrievedList.get(i)); - } - assertEquals("inprogress log segment should see start sequence id : 0", - 0L, retrievedList.get(1).getStartSequenceId()); - } - - private static void assertEqualsWithoutSequenceId(LogSegmentMetadata m1, LogSegmentMetadata m2) { - assertEquals("expected " + m1 + " but got " + m2, - m1.getLogSegmentSequenceNumber(), m2.getLogSegmentSequenceNumber()); - assertEquals("expected " + m1 + " but got " + m2, - m1.getLedgerId(), m2.getLedgerId()); - assertEquals("expected " + m1 + " but got " + m2, - m1.getFirstTxId(), m2.getFirstTxId()); - assertEquals("expected " + m1 + " but got " + m2, - m1.getLastTxId(), m2.getLastTxId()); - assertEquals("expected " + m1 + " but got " + m2, - m1.getLastDLSN(), m2.getLastDLSN()); - assertEquals("expected " + m1 + " but got " + m2, - m1.getRecordCount(), m2.getRecordCount()); - assertEquals("expected " + m1 + " but got " + m2, - m1.isInProgress(), m2.isInProgress()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/logsegment/TestPerStreamLogSegmentCache.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/logsegment/TestPerStreamLogSegmentCache.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/logsegment/TestPerStreamLogSegmentCache.java new file mode 100644 index 0000000..456ed68 --- /dev/null +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/logsegment/TestPerStreamLogSegmentCache.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.logsegment; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.twitter.distributedlog.DLMTestUtil; +import com.twitter.distributedlog.LogSegmentMetadata; +import com.twitter.distributedlog.exceptions.UnexpectedException; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.*; + +/** + * Test Case for Per Stream Log Segment Cache. + */ +public class TestPerStreamLogSegmentCache { + + @Test(timeout = 60000) + public void testBasicOperations() { + LogSegmentMetadata metadata = + DLMTestUtil.completedLogSegment("/segment1", 1L, 1L, 100L, 100, 1L, 99L, 0L); + String name = DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L); + + PerStreamLogSegmentCache cache = new PerStreamLogSegmentCache("test-basic-operations"); + assertNull("No log segment " + name + " should be cached", cache.get(name)); + cache.add(name, metadata); + LogSegmentMetadata metadataRetrieved = cache.get(name); + assertNotNull("log segment " + name + " should be cached", metadataRetrieved); + assertEquals("Wrong log segment metadata returned for " + name, + metadata, metadataRetrieved); + LogSegmentMetadata metadataRemoved = cache.remove(name); + assertNull("log segment " + name + " should be removed from cache", cache.get(name)); + assertEquals("Wrong log segment metadata removed for " + name, + metadata, metadataRemoved); + assertNull("No log segment " + name + " to be removed", cache.remove(name)); + } + + @Test(timeout = 60000) + public void testDiff() { + PerStreamLogSegmentCache cache = new PerStreamLogSegmentCache("test-diff"); + // add 5 completed log segments + for (int i = 1; i <= 5; i++) { + LogSegmentMetadata metadata = + DLMTestUtil.completedLogSegment("/segment" + i, i, i, i * 100L, 100, i, 99L, 0L); + String name = DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i); + cache.add(name, metadata); + } + // add one inprogress log segment + LogSegmentMetadata inprogress = + DLMTestUtil.inprogressLogSegment("/inprogress-6", 6, 600L, 6); + String name = DLMTestUtil.inprogressZNodeName(6); + cache.add(name, inprogress); + + // deleted first 2 completed log segments and completed the last one + Set<String> segmentRemoved = Sets.newHashSet(); + for (int i = 1; i <= 2; i++) { + segmentRemoved.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i)); + } + segmentRemoved.add((DLMTestUtil.inprogressZNodeName(6))); + Set<String> segmentReceived = Sets.newHashSet(); + Set<String> segmentAdded = Sets.newHashSet(); + for (int i = 3; i <= 6; i++) { + segmentReceived.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i)); + if (i == 6) { + segmentAdded.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i)); + } + } + + Pair<Set<String>, Set<String>> segmentChanges = cache.diff(segmentReceived); + assertTrue("Should remove " + segmentRemoved + ", but removed " + segmentChanges.getRight(), + Sets.difference(segmentRemoved, segmentChanges.getRight()).isEmpty()); + assertTrue("Should add " + segmentAdded + ", but added " + segmentChanges.getLeft(), + Sets.difference(segmentAdded, segmentChanges.getLeft()).isEmpty()); + } + + @Test(timeout = 60000) + public void testUpdate() { + PerStreamLogSegmentCache cache = new PerStreamLogSegmentCache("test-update"); + // add 5 completed log segments + for (int i = 1; i <= 5; i++) { + LogSegmentMetadata metadata = + DLMTestUtil.completedLogSegment("/segment" + i, i, i, i * 100L, 100, i, 99L, 0L); + String name = DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i); + cache.add(name, metadata); + } + // add one inprogress log segment + LogSegmentMetadata inprogress = + DLMTestUtil.inprogressLogSegment("/inprogress-6", 6, 600L, 6); + String name = DLMTestUtil.inprogressZNodeName(6); + cache.add(name, inprogress); + + // deleted first 2 completed log segments and completed the last one + Set<String> segmentRemoved = Sets.newHashSet(); + for (int i = 1; i <= 2; i++) { + segmentRemoved.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i)); + } + segmentRemoved.add((DLMTestUtil.inprogressZNodeName(6))); + Set<String> segmentReceived = Sets.newHashSet(); + Map<String, LogSegmentMetadata> segmentAdded = Maps.newHashMap(); + for (int i = 3; i <= 6; i++) { + segmentReceived.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i)); + if (i == 6) { + segmentAdded.put(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(i), + DLMTestUtil.completedLogSegment("/segment" + i, i, i, i * 100L, 100, i, 99L, 0L)); + } + } + + // update the cache + cache.update(segmentRemoved, segmentAdded); + for (String segment : segmentRemoved) { + assertNull("Segment " + segment + " should be removed.", cache.get(segment)); + } + for (String segment : segmentReceived) { + assertNotNull("Segment " + segment + " should not be removed", cache.get(segment)); + } + for (Map.Entry<String, LogSegmentMetadata> entry : segmentAdded.entrySet()) { + assertEquals("Segment " + entry.getKey() + " should be added.", + entry.getValue(), entry.getValue()); + } + } + + @Test(timeout = 60000, expected = UnexpectedException.class) + public void testGapDetection() throws Exception { + PerStreamLogSegmentCache cache = new PerStreamLogSegmentCache("test-gap-detection"); + cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L), + DLMTestUtil.completedLogSegment("/segment-1", 1L, 1L, 100L, 100, 1L, 99L, 0L)); + cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(3L), + DLMTestUtil.completedLogSegment("/segment-3", 3L, 3L, 300L, 100, 3L, 99L, 0L)); + cache.getLogSegments(LogSegmentMetadata.COMPARATOR); + } + + @Test(timeout = 60000) + public void testGapDetectionOnLogSegmentsWithoutLogSegmentSequenceNumber() throws Exception { + PerStreamLogSegmentCache cache = new PerStreamLogSegmentCache("test-gap-detection"); + LogSegmentMetadata segment1 = + DLMTestUtil.completedLogSegment("/segment-1", 1L, 1L, 100L, 100, 1L, 99L, 0L) + .mutator().setVersion(LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V1_ORIGINAL).build(); + cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L), segment1); + LogSegmentMetadata segment3 = + DLMTestUtil.completedLogSegment("/segment-3", 3L, 3L, 300L, 100, 3L, 99L, 0L) + .mutator().setVersion(LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO).build(); + cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(3L), segment3); + List<LogSegmentMetadata> expectedList = Lists.asList(segment1, new LogSegmentMetadata[] { segment3 }); + List<LogSegmentMetadata> resultList = cache.getLogSegments(LogSegmentMetadata.COMPARATOR); + assertEquals(expectedList, resultList); + } + + @Test(timeout = 60000, expected = UnexpectedException.class) + public void testSameLogSegment() throws Exception { + PerStreamLogSegmentCache cache = new PerStreamLogSegmentCache("test-same-log-segment"); + List<LogSegmentMetadata> expectedList = Lists.newArrayListWithExpectedSize(2); + LogSegmentMetadata inprogress = + DLMTestUtil.inprogressLogSegment("/inprogress-1", 1L, 1L, 1L); + expectedList.add(inprogress); + cache.add(DLMTestUtil.inprogressZNodeName(1L), inprogress); + LogSegmentMetadata completed = + DLMTestUtil.completedLogSegment("/segment-1", 1L, 1L, 100L, 100, 1L, 99L, 0L); + expectedList.add(completed); + cache.add(DLMTestUtil.completedLedgerZNodeNameWithLogSegmentSequenceNumber(1L), completed); + + cache.getLogSegments(LogSegmentMetadata.COMPARATOR); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-core/src/test/java/com/twitter/distributedlog/tools/TestDistributedLogTool.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/tools/TestDistributedLogTool.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/tools/TestDistributedLogTool.java index f9169c6..cbabf2a 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/tools/TestDistributedLogTool.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/tools/TestDistributedLogTool.java @@ -21,6 +21,7 @@ import java.net.URI; import com.twitter.distributedlog.DLMTestUtil; import com.twitter.distributedlog.DLSN; +import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.DistributedLogManager; import com.twitter.distributedlog.TestDistributedLogBase; import com.twitter.distributedlog.LocalDLMEmulator; @@ -205,8 +206,11 @@ public class TestDistributedLogTool extends TestDistributedLogBase { @Test(timeout = 60000) public void testToolTruncateStream() throws Exception { - DistributedLogManager dlm = DLMTestUtil.createNewDLM("testToolTruncateStream", conf, defaultUri); - DLMTestUtil.generateCompletedLogSegments(dlm, conf, 3, 1000); + DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); + confLocal.addConfiguration(conf); + confLocal.setLogSegmentCacheEnabled(false); + DistributedLogManager dlm = DLMTestUtil.createNewDLM("testToolTruncateStream", confLocal, defaultUri); + DLMTestUtil.generateCompletedLogSegments(dlm, confLocal, 3, 1000); DLSN dlsn = new DLSN(2,1,0); TruncateStreamCommand cmd = new TruncateStreamCommand(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9ee7d016/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/LogSegmentNotFoundException.java ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/LogSegmentNotFoundException.java b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/LogSegmentNotFoundException.java new file mode 100644 index 0000000..292d135 --- /dev/null +++ b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/LogSegmentNotFoundException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.exceptions; + +import com.twitter.distributedlog.thrift.service.StatusCode; + +/** + * Exception on log segment not found. + */ +public class LogSegmentNotFoundException extends DLException { + + private static final long serialVersionUID = -2482324226595903864L; + + public LogSegmentNotFoundException(String logSegmentPath) { + super(StatusCode.LOG_SEGMENT_NOT_FOUND, "Log Segment " + logSegmentPath + " not found"); + } +}