DL-158: update truncation status for all completed log segments update turncation status for all completed log segments, when purge truncated log segments we will leave one complete log segment.
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/98a29a58 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/98a29a58 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/98a29a58 Branch: refs/heads/master Commit: 98a29a5807fd7d42d2bb765be84527d4b2fb5dbc Parents: 0591d06 Author: Yiming Zang <yz...@twitter.com> Authored: Wed Dec 28 14:49:50 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Thu Dec 29 02:10:53 2016 -0800 ---------------------------------------------------------------------- .../java/com/twitter/distributedlog/BKLogWriteHandler.java | 9 ++++----- .../test/java/com/twitter/distributedlog/TestTruncate.java | 6 +++--- .../service/TestDistributedLogServerBase.java | 8 ++++---- 3 files changed, 11 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/98a29a58/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java index 2e31ac8..f8bc917 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java @@ -1049,8 +1049,7 @@ class BKLogWriteHandler extends BKLogHandler { List<LogSegmentMetadata> truncateList = new ArrayList<LogSegmentMetadata>(logSegments.size()); LogSegmentMetadata partialTruncate = null; LOG.info("{}: Truncating log segments older than {}", getFullyQualifiedName(), dlsn); - int numCandidates = getNumCandidateLogSegmentsToTruncate(logSegments); - for (int i = 0; i < numCandidates; i++) { + for (int i = 0; i < logSegments.size(); i++) { LogSegmentMetadata l = logSegments.get(i); if (!l.isInProgress()) { if (l.getLastDLSN().compareTo(dlsn) < 0) { @@ -1075,7 +1074,7 @@ class BKLogWriteHandler extends BKLogHandler { return setLogSegmentTruncationStatus(truncateList, partialTruncate, dlsn); } - private int getNumCandidateLogSegmentsToTruncate(List<LogSegmentMetadata> logSegments) { + private int getNumCandidateLogSegmentsToPurge(List<LogSegmentMetadata> logSegments) { if (logSegments.isEmpty()) { return 0; } else { @@ -1104,7 +1103,7 @@ class BKLogWriteHandler extends BKLogHandler { public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) { List<LogSegmentMetadata> purgeList = new ArrayList<LogSegmentMetadata>(logSegments.size()); - int numCandidates = getNumCandidateLogSegmentsToTruncate(logSegments); + int numCandidates = getNumCandidateLogSegmentsToPurge(logSegments); for (int iterator = 0; iterator < numCandidates; iterator++) { LogSegmentMetadata l = logSegments.get(iterator); @@ -1137,7 +1136,7 @@ class BKLogWriteHandler extends BKLogHandler { // we are deleting the log, we can remove whole log segments numLogSegmentsToProcess = logSegments.size(); } else { - numLogSegmentsToProcess = getNumCandidateLogSegmentsToTruncate(logSegments); + numLogSegmentsToProcess = getNumCandidateLogSegmentsToPurge(logSegments); } List<LogSegmentMetadata> purgeList = Lists.newArrayListWithExpectedSize(numLogSegmentsToProcess); for (int iterator = 0; iterator < numLogSegmentsToProcess; iterator++) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/98a29a58/distributedlog-core/src/test/java/com/twitter/distributedlog/TestTruncate.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestTruncate.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestTruncate.java index 70bfeea..98d2020 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestTruncate.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestTruncate.java @@ -137,7 +137,7 @@ public class TestTruncate extends TestDistributedLogBase { int txn = 43; DLSN dlsn = txid2DLSN.get((long) txn); assertTrue(Await.result(pair.getRight().truncate(dlsn))); - verifyEntries(name, 1, 31, 20); + verifyEntries(name, 1, 41, 10); Utils.close(pair.getRight()); pair.getLeft().close(); @@ -168,7 +168,7 @@ public class TestTruncate extends TestDistributedLogBase { int txn = 43; DLSN dlsn = txid2DLSN.get((long) txn); assertTrue(Await.result(pair.getRight().truncate(dlsn))); - verifyEntries(name, 1, 31, 20); + verifyEntries(name, 1, 41, 10); Utils.close(pair.getRight()); pair.getLeft().close(); @@ -178,7 +178,7 @@ public class TestTruncate extends TestDistributedLogBase { BKLogWriteHandler handler = dlm.createWriteHandler(true); FutureUtils.result(handler.purgeLogSegmentsOlderThanTxnId(Integer.MAX_VALUE)); - verifyEntries(name, 1, 31, 20); + verifyEntries(name, 1, 41, 10); } @Test(timeout = 60000) http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/98a29a58/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java index a0853e4..c45e42c 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java @@ -549,7 +549,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT long txid = 1; Map<Long, DLSN> txid2DLSN = new HashMap<Long, DLSN>(); - for (int s = 1; s <= 3; s++) { + for (int s = 1; s <= 2; s++) { for (long i = 1; i <= 10; i++) { long curTxId = txid++; logger.debug("Write entry {} to stream {}.", curTxId, name); @@ -557,12 +557,12 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT ByteBuffer.wrap(("" + curTxId).getBytes())).get(); txid2DLSN.put(curTxId, dlsn); } - if (s <= 2) { + if (s == 1) { dlClient.dlClient.release(name).get(); } } - DLSN dlsnToDelete = txid2DLSN.get(21L); + DLSN dlsnToDelete = txid2DLSN.get(11L); dlClient.dlClient.truncate(name, dlsnToDelete).get(); DistributedLogManager readDLM = DLMTestUtil.createNewDLM(name, conf, getUri()); @@ -576,7 +576,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT ++numRead; r = reader.readNext(false); } - assertEquals(20, numRead); + assertEquals(10, numRead); reader.close(); readDLM.close(); }