DL-110: Write control record if necessary when roll a log segment
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/6e507a31 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/6e507a31 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/6e507a31 Branch: refs/heads/master Commit: 6e507a313341740b7bdc1b7465fe7d7ccb49c3a4 Parents: 3dd39c1 Author: Sijie Guo <sij...@twitter.com> Authored: Mon Nov 21 16:17:51 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Tue Dec 27 16:49:29 2016 -0800 ---------------------------------------------------------------------- .../twitter/distributedlog/BKAsyncLogWriter.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/6e507a31/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java index e96b5af..a6b5fd2 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java @@ -121,7 +121,7 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWri public void onSuccess(DLSN value) { super.onSuccess(value); // roll log segment and issue all pending requests. - rollLogSegmentAndIssuePendingRequests(record); + rollLogSegmentAndIssuePendingRequests(record.getTransactionId()); } @Override @@ -308,7 +308,7 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWri result = lastLogRecordInCurrentSegment.promise; } else { // no log segment yet. roll the log segment and issue pending requests. result = queueRequest(record, flush); - rollLogSegmentAndIssuePendingRequests(record); + rollLogSegmentAndIssuePendingRequests(record.getTransactionId()); } } else { result = w.asyncWrite(record, flush); @@ -353,8 +353,8 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWri } } - private void rollLogSegmentAndIssuePendingRequests(LogRecord record) { - getLogSegmentWriter(record.getTransactionId(), true, true) + private void rollLogSegmentAndIssuePendingRequests(final long firstTxId) { + getLogSegmentWriter(firstTxId, true, true) .addEventListener(new FutureEventListener<BKLogSegmentWriter>() { @Override public void onSuccess(BKLogSegmentWriter writer) { @@ -365,6 +365,17 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWri writer.asyncWrite(pendingLogRecord.record, pendingLogRecord.flush) .addEventListener(pendingLogRecord); } + // if there are no records in the pending queue, let's write a control record + // so that when a new log segment is rolled, a control record will be added and + // the corresponding bookies would be able to create its ledger. + if (pendingRequests.isEmpty()) { + LogRecord controlRecord = new LogRecord(firstTxId, + DistributedLogConstants.CONTROL_RECORD_CONTENT); + controlRecord.setControl(); + PendingLogRecord controlReq = new PendingLogRecord(controlRecord, false); + writer.asyncWrite(controlReq.record, controlReq.flush) + .addEventListener(controlReq); + } if (null != rollingFuture) { FutureUtils.setValue(rollingFuture, writer); }