DL-110: Write control record if necessary when roll a log segment

Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/6e507a31
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/6e507a31
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/6e507a31

Branch: refs/heads/master
Commit: 6e507a313341740b7bdc1b7465fe7d7ccb49c3a4
Parents: 3dd39c1
Author: Sijie Guo <sij...@twitter.com>
Authored: Mon Nov 21 16:17:51 2016 -0800
Committer: Sijie Guo <sij...@twitter.com>
Committed: Tue Dec 27 16:49:29 2016 -0800

----------------------------------------------------------------------
 .../twitter/distributedlog/BKAsyncLogWriter.java | 19 +++++++++++++++----
 1 file changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/6e507a31/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
index e96b5af..a6b5fd2 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
@@ -121,7 +121,7 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWri
         public void onSuccess(DLSN value) {
             super.onSuccess(value);
             // roll log segment and issue all pending requests.
-            rollLogSegmentAndIssuePendingRequests(record);
+            rollLogSegmentAndIssuePendingRequests(record.getTransactionId());
         }
 
         @Override
@@ -308,7 +308,7 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWri
                 result = lastLogRecordInCurrentSegment.promise;
             } else { // no log segment yet. roll the log segment and issue 
pending requests.
                 result = queueRequest(record, flush);
-                rollLogSegmentAndIssuePendingRequests(record);
+                
rollLogSegmentAndIssuePendingRequests(record.getTransactionId());
             }
         } else {
             result = w.asyncWrite(record, flush);
@@ -353,8 +353,8 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWri
         }
     }
 
-    private void rollLogSegmentAndIssuePendingRequests(LogRecord record) {
-        getLogSegmentWriter(record.getTransactionId(), true, true)
+    private void rollLogSegmentAndIssuePendingRequests(final long firstTxId) {
+        getLogSegmentWriter(firstTxId, true, true)
                 .addEventListener(new 
FutureEventListener<BKLogSegmentWriter>() {
             @Override
             public void onSuccess(BKLogSegmentWriter writer) {
@@ -365,6 +365,17 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWri
                             writer.asyncWrite(pendingLogRecord.record, 
pendingLogRecord.flush)
                                     .addEventListener(pendingLogRecord);
                         }
+                        // if there are no records in the pending queue, let's 
write a control record
+                        // so that when a new log segment is rolled, a control 
record will be added and
+                        // the corresponding bookies would be able to create 
its ledger.
+                        if (pendingRequests.isEmpty()) {
+                            LogRecord controlRecord = new LogRecord(firstTxId,
+                                    
DistributedLogConstants.CONTROL_RECORD_CONTENT);
+                            controlRecord.setControl();
+                            PendingLogRecord controlReq = new 
PendingLogRecord(controlRecord, false);
+                            writer.asyncWrite(controlReq.record, 
controlReq.flush)
+                                    .addEventListener(controlReq);
+                        }
                         if (null != rollingFuture) {
                             FutureUtils.setValue(rollingFuture, writer);
                         }

Reply via email to