This is an automated email from the ASF dual-hosted git repository.
ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 533dd5d [DLOG] Delete empty inprogress segment on recovery
533dd5d is described below
commit 533dd5d6e9d3ac9b24d2462f6ae4d4240a4bffdf
Author: Ivan Kelly <[email protected]>
AuthorDate: Thu Jun 20 14:36:11 2019 +0200
[DLOG] Delete empty inprogress segment on recovery
This addresses an issue, whereby if a user is using getLastTxId() to
find the first txnid when starting to write, they will continue to be
able to write if the last segment written by the previous writer was
empty. If the last segment was empty, then the maxTxId would be higher
than the result of getLastTxId(). maxTxId is read from a znode, while
getLastTxId() reads the txid of the last persisted record. In the case
of an empty inprogress segment, the maxTxId znode was being updated
with the expected first transaction id of the segment.
This patch addresses the issue with the following changes:
1. The maxTxId znode is only updated when inprogress segment is
completed, so it's value always refers to a transaction that
exists.
2. On recovery, if the inprogress segment is empty, delete it. There
was a TODO comment to do this already there.
3. When generating the sequence number, allow a potential sequence
number which is equal to the current max sequence number, as this
can be the case where recovery deleted an inprogress empty segment.
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #2099 from ivankelly/txnid-issue
---
.../apache/distributedlog/BKLogWriteHandler.java | 65 ++++++++----
.../distributedlog/TestDistributedLogBase.java | 6 +-
.../java/org/apache/distributedlog/TestTxnId.java | 118 +++++++++++++++++++++
3 files changed, 165 insertions(+), 24 deletions(-)
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
index 0ce0a25..3095ec7 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -145,9 +146,24 @@ class BKLogWriteHandler extends BKLogHandler {
segmentList,
recoverLogSegmentFunction,
scheduler
- ).thenApply(GetLastTxIdFunction.INSTANCE);
+ ).thenApply(removeEmptySegments)
+ .thenApply(GetLastTxIdFunction.INSTANCE);
}
};
+ private final Function<List<LogSegmentMetadata>, List<LogSegmentMetadata>>
removeEmptySegments =
+ new Function<List<LogSegmentMetadata>, List<LogSegmentMetadata>>() {
+ @Override
+ public List<LogSegmentMetadata> apply(List<LogSegmentMetadata>
segmentList) {
+ Iterator<LogSegmentMetadata> iter = segmentList.iterator();
+ while (iter.hasNext()) {
+ LogSegmentMetadata segment = iter.next();
+ if (segment == null) {
+ iter.remove();
+ }
+ }
+ return segmentList;
+ }
+ };
// Stats
private final StatsLogger perLogStatsLogger;
@@ -479,7 +495,11 @@ class BKLogWriteHandler extends BKLogHandler {
// no ledger seqno stored in /ledgers before
LOG.info("No max ledger sequence number found while creating log
segment {} for {}.",
logSegmentSeqNo, getFullyQualifiedName());
- } else if (maxLogSegmentSequenceNo.getSequenceNumber() + 1 !=
logSegmentSeqNo) {
+ } else if (maxLogSegmentSequenceNo.getSequenceNumber() + 1 !=
logSegmentSeqNo // case 1
+ && maxLogSegmentSequenceNo.getSequenceNumber() !=
logSegmentSeqNo) { // case 2
+ // case 1 is the common case, where the new log segment is 1 more
than the previous
+ // case 2 can occur when the writer crashes with an empty in
progress ledger. This is then deleted
+ // on recovery, so the next new segment will have a
matching sequence number
LOG.warn("Unexpected max log segment sequence number {} for {} :
list of cached segments = {}",
new Object[]{maxLogSegmentSequenceNo.getSequenceNumber(),
getFullyQualifiedName(),
getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR)});
@@ -635,10 +655,6 @@ class BKLogWriteHandler extends BKLogHandler {
LOG.debug("Try storing max sequence number in startLogSegment {} :
{}", inprogressZnodePath, logSegmentSeqNo);
storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, logSegmentSeqNo,
true);
- // Try storing max tx id.
- LOG.debug("Try storing MaxTxId in startLogSegment {} {}",
inprogressZnodePath, txId);
- storeMaxTxId(txn, maxTxId, txId);
-
txn.execute().whenCompleteAsync(new FutureEventListener<Void>() {
@Override
@@ -1032,22 +1048,29 @@ class BKLogWriteHandler extends BKLogHandler {
return FutureUtils.exception(new IOException("Unrecoverable
corruption,"
+ " please check logs."));
} else if (endTxId ==
DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID) {
- // TODO: Empty ledger - Ideally we should just remove it?
- endTxId = l.getFirstTxId();
- }
+ LOG.info("Inprogress segment {} is empty, deleting", l);
- CompletableFuture<LogSegmentMetadata> promise = new
CompletableFuture<LogSegmentMetadata>();
- doCompleteAndCloseLogSegment(
- l.getZNodeName(),
- l.getLogSegmentSequenceNumber(),
- l.getLogSegmentId(),
- l.getFirstTxId(),
- endTxId,
- recordCount,
- lastEntryId,
- lastSlotId,
- promise);
- return promise;
+ return deleteLogSegment(l).thenApply(
+ (result) -> {
+ synchronized (inprogressLSSNs) {
+ inprogressLSSNs.remove((Long)
l.getLogSegmentSequenceNumber());
+ }
+ return null;
+ });
+ } else {
+ CompletableFuture<LogSegmentMetadata> promise = new
CompletableFuture<LogSegmentMetadata>();
+ doCompleteAndCloseLogSegment(
+ l.getZNodeName(),
+ l.getLogSegmentSequenceNumber(),
+ l.getLogSegmentId(),
+ l.getFirstTxId(),
+ endTxId,
+ recordCount,
+ lastEntryId,
+ lastSlotId,
+ promise);
+ return promise;
+ }
}
}
diff --git
a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
index 903e684..16565a8 100644
---
a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
+++
b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
@@ -95,12 +95,12 @@ public class TestDistributedLogBase {
protected static String zkServers;
protected static int zkPort;
protected static int numBookies = 3;
- private static final List<File> tmpDirs = new ArrayList<File>();
+ protected static final List<File> TMP_DIRS = new ArrayList<File>();
@BeforeClass
public static void setupCluster() throws Exception {
File zkTmpDir = IOUtils.createTempDir("zookeeper", "distrlog");
- tmpDirs.add(zkTmpDir);
+ TMP_DIRS.add(zkTmpDir);
Pair<ZooKeeperServerShim, Integer> serverAndPort =
LocalDLMEmulator.runZookeeperOnAnyPort(zkTmpDir);
zks = serverAndPort.getLeft();
zkPort = serverAndPort.getRight();
@@ -125,7 +125,7 @@ public class TestDistributedLogBase {
public static void teardownCluster() throws Exception {
bkutil.teardown();
zks.stop();
- for (File dir : tmpDirs) {
+ for (File dir : TMP_DIRS) {
FileUtils.forceDeleteOnExit(dir);
}
}
diff --git
a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestTxnId.java
b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestTxnId.java
new file mode 100644
index 0000000..d5f4115
--- /dev/null
+++
b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestTxnId.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test Cases for RollLogSegments.
+ */
+public class TestTxnId extends TestDistributedLogBase {
+ private static final Logger logger =
LoggerFactory.getLogger(TestRollLogSegments.class);
+
+ @Test
+ public void testRecoveryAfterBookieCrash() throws Exception {
+ String name = "txnid-after-crash";
+ DistributedLogConfiguration conf = new DistributedLogConfiguration()
+ .setEnsembleSize(5)
+ .setWriteQuorumSize(5)
+ .setAckQuorumSize(5)
+ .setLogSegmentRollingIntervalMinutes(0)
+ .setLogSegmentRollingConcurrency(-1)
+ .setMaxLogSegmentBytes(400000);
+
+ long entryId = 0;
+ List<BookieServer> extraBookies = new ArrayList<>();
+ try {
+ extraBookies.add(startExtraBookie());
+ extraBookies.add(startExtraBookie());
+
+ try (BKDistributedLogManager dlm = (BKDistributedLogManager)
createNewDLM(conf, name);
+ BKAsyncLogWriter writer =
dlm.startAsyncLogSegmentNonPartitioned()) {
+ writer.write(DLMTestUtil.getLogRecordInstance(1,
100000)).join();
+ writer.write(DLMTestUtil.getLogRecordInstance(2,
100000)).join();
+
+ extraBookies.forEach(b -> b.shutdown());
+
+ try {
+ writer.write(DLMTestUtil.getLogRecordInstance(3,
100000)).join();
+ Assert.fail("Shouldn't have succeeded");
+ } catch (Exception e) {
+ // expected
+ }
+
+ writer.write(DLMTestUtil.getLogRecordInstance(4,
100000)).join();
+ Assert.fail("Shouldn't be able to write");
+ } catch (Exception e) {
+ // expected
+ }
+
+ extraBookies.add(startExtraBookie());
+ extraBookies.add(startExtraBookie());
+
+ try (BKDistributedLogManager dlm = (BKDistributedLogManager)
createNewDLM(conf, name);
+ BKAsyncLogWriter writer =
dlm.startAsyncLogSegmentNonPartitioned()) {
+ long firstTxid = dlm.getLastTxId() + 1;
+ for (int i = 0; i < 20; i++) {
+ logger.info("Writing entry {}", i);
+ writer.write(DLMTestUtil.getLogRecordInstance(firstTxid +
i, 100000)).join();
+ Thread.sleep(100);
+ }
+ }
+ } finally {
+ extraBookies.forEach(b -> b.shutdown());
+ }
+ }
+
+ private BookieServer startExtraBookie() throws Exception {
+ File journalDir = File.createTempFile("bookie", "journal");
+ journalDir.delete();
+ journalDir.mkdir();
+ TMP_DIRS.add(journalDir);
+
+ File ledgerDir = File.createTempFile("bookie", "ledger");
+ ledgerDir.delete();
+ ledgerDir.mkdir();
+ TMP_DIRS.add(ledgerDir);
+
+ ServerConfiguration conf = new ServerConfiguration();
+ conf.setMetadataServiceUri("zk://" + zkServers + "/ledgers");
+ conf.setBookiePort(0);
+ conf.setDiskUsageThreshold(0.99f);
+ conf.setAllowLoopback(true);
+ conf.setJournalDirName(journalDir.getPath());
+ conf.setLedgerDirNames(new String[] { ledgerDir.getPath() });
+
+ BookieServer server = new BookieServer(conf, new
NullStatsProvider().getStatsLogger(""));
+ server.start();
+
+ while (!server.isRunning()) {
+ Thread.sleep(10);
+ }
+ return server;
+ }
+}