Repository: incubator-distributedlog Updated Branches: refs/heads/master 40df29d9b -> b4150fc84
DL-87: Introduce periodic keepalive control record in writer merge twitter's change from Leigh Stewart Author: Jordan Bull <jb...@twitter.com> Author: Sijie Guo <sij...@twitter.com> Author: Leigh Stewart <lstew...@twitter.com> Reviewers: Leigh Stewart <lstew...@apache.org> Closes #59 from sijie/merge/DL-87 Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/b4150fc8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/b4150fc8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/b4150fc8 Branch: refs/heads/master Commit: b4150fc842e63ea43dc648df21c0c2d00489282b Parents: 40df29d Author: Jordan Bull <jb...@twitter.com> Authored: Fri Dec 16 22:43:24 2016 -0800 Committer: Sijie Guo <si...@apache.org> Committed: Fri Dec 16 22:43:24 2016 -0800 ---------------------------------------------------------------------- .../distributedlog/BKLogSegmentWriter.java | 47 ++++++++++++++++- .../DistributedLogConfiguration.java | 25 +++++++++ .../distributedlog/DistributedLogConstants.java | 1 + .../distributedlog/TestAsyncReaderWriter.java | 53 ++++++++++++++++++++ 4 files changed, 125 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b4150fc8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java index 004b2fb..1b52951 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java @@ -72,6 +72,7 @@ import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.util.MathUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.runtime.AbstractFunction1; @@ -128,6 +129,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz private long numFlushesSinceRestart = 0; private long numBytes = 0; private long lastEntryId = Long.MIN_VALUE; + private long lastTransmitNanos = Long.MIN_VALUE; + private final int periodicKeepAliveMs; // Indicates whether there are writes that have been successfully transmitted that would need // a control record to be transmitted to make them visible to the readers by updating the last @@ -137,7 +140,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz private int minDelayBetweenImmediateFlushMs = 0; private Stopwatch lastTransmit; private boolean streamEnded = false; - private ScheduledFuture<?> periodicFlushSchedule = null; + private final ScheduledFuture<?> periodicFlushSchedule; + private final ScheduledFuture<?> periodicKeepAliveSchedule; final private AtomicReference<ScheduledFuture<?>> transmitSchedFutureRef = new AtomicReference<ScheduledFuture<?>>(null); final private AtomicReference<ScheduledFuture<?>> immFlushSchedFutureRef = new AtomicReference<ScheduledFuture<?>>(null); final private AtomicReference<Exception> scheduledFlushException = new AtomicReference<Exception>(null); @@ -312,11 +316,25 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz if (periodicFlushFrequency > 0 && scheduler != null) { periodicFlushSchedule = scheduler.scheduleAtFixedRate(this, periodicFlushFrequency/2, periodicFlushFrequency/2, TimeUnit.MILLISECONDS); + } else { + periodicFlushSchedule = null; } } else { // Min delay heuristic applies only when immediate flush is enabled // and transmission threshold is zero minDelayBetweenImmediateFlushMs = conf.getMinDelayBetweenImmediateFlushMs(); + periodicFlushSchedule = null; + } + this.periodicKeepAliveMs = conf.getPeriodicKeepAliveMilliSeconds(); + if (periodicKeepAliveMs > 0 && scheduler != null) { + periodicKeepAliveSchedule = scheduler.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + keepAlive(); + } + }, periodicKeepAliveMs, periodicKeepAliveMs, TimeUnit.MILLISECONDS); + } else { + periodicKeepAliveSchedule = null; } this.conf = conf; @@ -513,6 +531,13 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz private void closeInternal(final boolean abort, final AtomicReference<Throwable> throwExc, final Promise<Void> closePromise) { + // Cancel the periodic keep alive schedule first + if (null != periodicKeepAliveSchedule) { + if (!periodicKeepAliveSchedule.cancel(false)) { + LOG.info("Periodic keepalive for log segment {} isn't cancelled.", getFullyQualifiedLogSegment()); + } + } + // Cancel the periodic flush schedule first // The task is allowed to exit gracefully if (null != periodicFlushSchedule) { @@ -1079,6 +1104,9 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz } synchronized (this) { + // update the transmit timestamp + lastTransmitNanos = MathUtils.nowInNano(); + BKTransmitPacket packet = new BKTransmitPacket(recordSetToTransmit); packetPrevious = packet; entryWriter.asyncAddEntry(toSend.getData(), 0, toSend.size(), @@ -1293,4 +1321,21 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz } } + synchronized private void keepAlive() { + if (null != closeFuture) { + // if the log segment is closing, skip sending any keep alive records. + LOG.debug("Skip sending keepAlive control record since log segment {} is closing.", + getFullyQualifiedLogSegment()); + return; + } + + if (MathUtils.elapsedMSec(lastTransmitNanos) < periodicKeepAliveMs) { + return; + } + + LogRecord controlRec = new LogRecord(lastTxId, DistributedLogConstants.KEEPALIVE_RECORD_CONTENT); + controlRec.setControl(); + asyncWrite(controlRec); + } + } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b4150fc8/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java index d2af862..c2057df 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java @@ -249,6 +249,8 @@ public class DistributedLogConfiguration extends CompositeConfiguration { public static final boolean BKDL_ENABLE_IMMEDIATE_FLUSH_DEFAULT = false; public static final String BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS = "minimumDelayBetweenImmediateFlushMilliSeconds"; public static final int BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS_DEFAULT = 0; + public static final String BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS = "periodicKeepAliveMilliSeconds"; + public static final int BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT = 0; // Retention/Truncation Settings public static final String BKDL_RETENTION_PERIOD_IN_HOURS = "logSegmentRetentionHours"; @@ -1893,6 +1895,29 @@ public class DistributedLogConfiguration extends CompositeConfiguration { return this; } + /** + * Get Periodic Keep Alive Frequency in milliseconds. + * <p>If the setting is set with a positive value, it would periodically write a control record + * to keep the stream active. The default value is 0. + * + * @return periodic keep alive frequency in milliseconds. + */ + public int getPeriodicKeepAliveMilliSeconds() { + return this.getInt(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT); + } + + /** + * Set Periodic Keep Alive Frequency in milliseconds. + * + * @param keepAliveMs keep alive frequency in milliseconds. + * @return distributedlog configuration + * @see #getPeriodicKeepAliveMilliSeconds() + */ + public DistributedLogConfiguration setPeriodicKeepAliveMilliSeconds(int keepAliveMs) { + setProperty(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, keepAliveMs); + return this; + } + // // DL Retention/Truncation Settings // http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b4150fc8/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java index 5c50282..32def94 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java @@ -58,6 +58,7 @@ public class DistributedLogConstants { public static final String COMPLETED_LOGSEGMENT_PREFIX = "logrecs"; public static final String DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME = "disallow_bookie_placement"; static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8); + public static final byte[] KEEPALIVE_RECORD_CONTENT = "keepalive".getBytes(UTF_8); // An ACL that gives all permissions to node creators and read permissions only to everyone else. public static final List<ACL> EVERYONE_READ_CREATOR_ALL = http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b4150fc8/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java index 8011a04..a4832b0 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java @@ -2115,4 +2115,57 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { } } } + + @Test(timeout = 60000) + public void testIdleReaderExceptionWhenKeepAliveIsDisabled() throws Exception { + String name = runtime.getMethodName(); + DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); + confLocal.addConfiguration(testConf); + confLocal.setOutputBufferSize(0); + confLocal.setImmediateFlushEnabled(false); + confLocal.setPeriodicFlushFrequencyMilliSeconds(0); + confLocal.setPeriodicKeepAliveMilliSeconds(0); + confLocal.setReaderIdleWarnThresholdMillis(20); + confLocal.setReaderIdleErrorThresholdMillis(40); + + URI uri = createDLMURI("/" + name); + ensureURICreated(uri); + + DistributedLogManager dlm = createNewDLM(confLocal, name); + BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter()); + writer.write(DLMTestUtil.getLogRecordInstance(1L)); + + AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN)); + try { + FutureUtils.result(reader.readNext()); + fail("Should fail when stream is idle"); + } catch (IdleReaderException ire) { + // expected + } + } + + @Test(timeout = 60000) + public void testIdleReaderExceptionWhenKeepAliveIsEnabled() throws Exception { + String name = runtime.getMethodName(); + DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); + confLocal.addConfiguration(testConf); + confLocal.setOutputBufferSize(0); + confLocal.setImmediateFlushEnabled(false); + confLocal.setPeriodicFlushFrequencyMilliSeconds(0); + confLocal.setPeriodicKeepAliveMilliSeconds(1000); + confLocal.setReaderIdleWarnThresholdMillis(2000); + confLocal.setReaderIdleErrorThresholdMillis(4000); + + URI uri = createDLMURI("/" + name); + ensureURICreated(uri); + + DistributedLogManager dlm = createNewDLM(confLocal, name); + BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter()); + writer.write(DLMTestUtil.getLogRecordInstance(1L)); + + AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN)); + LogRecordWithDLSN record = FutureUtils.result(reader.readNext()); + assertEquals(1L, record.getTransactionId()); + DLMTestUtil.verifyLogRecord(record); + } }