wuchong commented on code in PR #1386:
URL: https://github.com/apache/fluss/pull/1386#discussion_r2267107815
##########
fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterAppendInfo.java:
##########
@@ -83,8 +88,9 @@ public WriterStateEntry toEntry() {
return updatedEntry;
}
- private boolean inSequence(int lastBatchSeq, int nextBatchSeq) {
- return nextBatchSeq == lastBatchSeq + 1L
+ private boolean inSequence(int lastBatchSeq, int nextBatchSeq, boolean
isBatchExpired) {
+ return (lastBatchSeq == NO_BATCH_SEQUENCE && isBatchExpired)
Review Comment:
Add a comment to explain this statement, this is the core change of the
logic.
##########
fluss-server/src/test/java/com/alibaba/fluss/server/log/WriterStateManagerTest.java:
##########
@@ -179,36 +182,94 @@ void testFetchSnapshotEmptySnapshot() {
@Test
void testRemoveExpiredWritersOnReload() throws IOException {
- append(stateManager, writerId, 0, 0L, 0);
- append(stateManager, writerId, 1, 1L, 1);
+ append(stateManager, writerId, 0, 0L, false, 0);
+ append(stateManager, writerId, 1, 1L, false, 1);
stateManager.takeSnapshot();
WriterStateManager recoveredMapping =
new WriterStateManager(
tableBucket,
logDir,
(int)
conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis());
- recoveredMapping.truncateAndReload(0L, 1L, 70000);
+ recoveredMapping.truncateAndReload(0L, 1L);
// Entry added after recovery. The writer id should be expired now,
and would not exist in
// the writer mapping. If writing with the same writerId and non-zero
batch sequence, the
// OutOfOrderSequenceException will throw. If you want to continue to
write, you need to get
// a new writer id.
- assertThatThrownBy(() -> append(recoveredMapping, writerId, 2, 2L,
70001))
+ assertThatThrownBy(() -> append(recoveredMapping, writerId, 2, 2L,
false, 70001))
.isInstanceOf(OutOfOrderSequenceException.class)
.hasMessageContaining(
"Out of order batch sequence for writer 1 at offset 2
in "
+ "table-bucket TableBucket{tableId=1001,
bucket=0}"
+ " : 2 (incoming batch seq.), -1 (current
batch seq.)");
- append(recoveredMapping, 2L, 0, 2L, 70002);
+ append(recoveredMapping, 2L, 0, 2L, false, 70002);
assertThat(recoveredMapping.activeWriters().size()).isEqualTo(1);
assertThat(recoveredMapping.activeWriters().values().iterator().next().lastBatchSequence())
.isEqualTo(0);
assertThat(recoveredMapping.mapEndOffset()).isEqualTo(3L);
}
+ @Test
+ void testLoadFromSnapshotRetainExpiredWriters() throws Exception {
+ ManualClock clock = new ManualClock(1000L);
+
+ // 2 seconds to expire the writer.
+ conf.set(ConfigOptions.WRITER_ID_EXPIRATION_TIME,
Duration.ofSeconds(2));
+ WriterStateManager stateManager1 =
+ new WriterStateManager(
+ tableBucket,
+ logDir,
+ (int)
conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis());
+
+ long writerId1 = 1L;
+ long writerId2 = 2L;
+
+ append(stateManager1, writerId1, 0, 0L, false, clock.milliseconds());
+ append(stateManager1, writerId2, 0, 1L, false, clock.milliseconds());
+ stateManager1.takeSnapshot();
+ assertThat(stateManager1.activeWriters().size()).isEqualTo(2);
+
+ // trigger clock move to 5000L which means the batches expired.
+ clock.advanceTime(5000L, TimeUnit.MILLISECONDS);
+ // new one new WriterStateManager to mock tabletServer restart and
reload from snapshot.
+ WriterStateManager stateManager2 =
+ new WriterStateManager(
+ tableBucket,
+ logDir,
+ (int)
conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis());
+ stateManager2.truncateAndReload(0L, 2L);
+ // all writers are retained.
+ assertThat(stateManager2.activeWriters().size()).isEqualTo(2);
+ }
+
+ @Test
+ void testAppendAnExpiredBatchWithEmptyWriterStatus() throws Exception {
+ ManualClock clock = new ManualClock(5000L);
+
+ // 2 seconds to expire the writer.
+ conf.set(ConfigOptions.WRITER_ID_EXPIRATION_TIME,
Duration.ofSeconds(2));
+ WriterStateManager stateManager1 =
+ new WriterStateManager(
+ tableBucket,
+ logDir,
+ (int)
conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis());
+
+ // If we try to append an expired batch with none zero batch sequence,
the
+ // OutOfOrderSequenceException will not been throw.
+ append(stateManager1, 1L, 10, 10L, true, clock.milliseconds());
Review Comment:
It will be better to add a check after this that the activeWriters in
stateManager is 1, and the last sequence is 10 (use the new batch sequence).
##########
fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterStateManager.java:
##########
@@ -419,6 +414,10 @@ private boolean isWriterExpired(long currentTimeMs,
WriterStateEntry writerState
return currentTimeMs - writerStateEntry.lastBatchTimestamp() >
writerExpirationMs;
}
+ public boolean isBatchExpired(long currentTimeMs, LogRecordBatch
recordBatch) {
Review Comment:
How about renaming to `isWriterInBatchExpired` which I think is clearer?
`isBatchExpired` sounds like the batch is expired, but we don't have this
concept.
##########
fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java:
##########
@@ -1115,7 +1115,8 @@ private static void updateWriterAppendInfo(
// update writers.
WriterAppendInfo appendInfo =
writers.computeIfAbsent(writerId, id ->
writerStateManager.prepareUpdate(writerId));
- appendInfo.append(batch);
+ appendInfo.append(
+ batch,
writerStateManager.isBatchExpired(System.currentTimeMillis(), batch));
Review Comment:
Can we move the calculation of `writerStateManager.isBatchExpired` to
`loadWritersFromRecords`? This can avoid too many calls of
`System.currentTimeMillis()` for each record batch and only checks
`isBatchExpired` for restoring state from records.
##########
fluss-server/src/test/java/com/alibaba/fluss/server/log/WriterStateManagerTest.java:
##########
@@ -179,36 +182,94 @@ void testFetchSnapshotEmptySnapshot() {
@Test
void testRemoveExpiredWritersOnReload() throws IOException {
- append(stateManager, writerId, 0, 0L, 0);
- append(stateManager, writerId, 1, 1L, 1);
+ append(stateManager, writerId, 0, 0L, false, 0);
+ append(stateManager, writerId, 1, 1L, false, 1);
stateManager.takeSnapshot();
WriterStateManager recoveredMapping =
new WriterStateManager(
tableBucket,
logDir,
(int)
conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis());
- recoveredMapping.truncateAndReload(0L, 1L, 70000);
+ recoveredMapping.truncateAndReload(0L, 1L);
// Entry added after recovery. The writer id should be expired now,
and would not exist in
// the writer mapping. If writing with the same writerId and non-zero
batch sequence, the
// OutOfOrderSequenceException will throw. If you want to continue to
write, you need to get
// a new writer id.
- assertThatThrownBy(() -> append(recoveredMapping, writerId, 2, 2L,
70001))
+ assertThatThrownBy(() -> append(recoveredMapping, writerId, 2, 2L,
false, 70001))
.isInstanceOf(OutOfOrderSequenceException.class)
.hasMessageContaining(
"Out of order batch sequence for writer 1 at offset 2
in "
+ "table-bucket TableBucket{tableId=1001,
bucket=0}"
+ " : 2 (incoming batch seq.), -1 (current
batch seq.)");
- append(recoveredMapping, 2L, 0, 2L, 70002);
+ append(recoveredMapping, 2L, 0, 2L, false, 70002);
assertThat(recoveredMapping.activeWriters().size()).isEqualTo(1);
assertThat(recoveredMapping.activeWriters().values().iterator().next().lastBatchSequence())
.isEqualTo(0);
assertThat(recoveredMapping.mapEndOffset()).isEqualTo(3L);
}
+ @Test
+ void testLoadFromSnapshotRetainExpiredWriters() throws Exception {
+ ManualClock clock = new ManualClock(1000L);
+
+ // 2 seconds to expire the writer.
+ conf.set(ConfigOptions.WRITER_ID_EXPIRATION_TIME,
Duration.ofSeconds(2));
+ WriterStateManager stateManager1 =
+ new WriterStateManager(
+ tableBucket,
+ logDir,
+ (int)
conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis());
+
+ long writerId1 = 1L;
+ long writerId2 = 2L;
+
+ append(stateManager1, writerId1, 0, 0L, false, clock.milliseconds());
+ append(stateManager1, writerId2, 0, 1L, false, clock.milliseconds());
+ stateManager1.takeSnapshot();
+ assertThat(stateManager1.activeWriters().size()).isEqualTo(2);
+
+ // trigger clock move to 5000L which means the batches expired.
+ clock.advanceTime(5000L, TimeUnit.MILLISECONDS);
+ // new one new WriterStateManager to mock tabletServer restart and
reload from snapshot.
+ WriterStateManager stateManager2 =
+ new WriterStateManager(
+ tableBucket,
+ logDir,
+ (int)
conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis());
+ stateManager2.truncateAndReload(0L, 2L);
+ // all writers are retained.
+ assertThat(stateManager2.activeWriters().size()).isEqualTo(2);
Review Comment:
Why retain expired writers after reloading snapshot? IIUC, the retained
expired writers will soon be cleaned up by the
`PeriodicWriterIdExpirationCheck` thread. I think the new implementation can
tolerate expired writers now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]