This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 6bc4d0b KAFKA-13073: Fix MockLog snapshot implementation (#11032)
6bc4d0b is described below
commit 6bc4d0b48146b80e52e6a34a1f5e1aa38c828a1b
Author: José Armando García Sancio <[email protected]>
AuthorDate: Tue Jul 13 17:06:18 2021 -0700
KAFKA-13073: Fix MockLog snapshot implementation (#11032)
Fix a simulation test failure by:
1. Relaxing the valiation of the snapshot id against the log start
offset when the state machine attempts to create new snapshot. It
is safe to just ignore the request instead of throwing an exception
when the snapshot id is less that the log start offset.
2. Fixing the MockLog implementation so that it uses startOffset both
externally and internally.
Reviewers: Colin P. McCabe <[email protected]>
---
.../main/scala/kafka/raft/KafkaMetadataLog.scala | 13 +++--
.../scala/kafka/raft/KafkaMetadataLogTest.scala | 5 +-
raft/config/kraft-log4j.properties | 1 +
.../java/org/apache/kafka/raft/ReplicatedLog.java | 8 +--
.../test/java/org/apache/kafka/raft/MockLog.java | 58 ++++++++++++++--------
.../java/org/apache/kafka/raft/MockLogTest.java | 8 ++-
.../apache/kafka/raft/RaftClientTestContext.java | 4 +-
.../apache/kafka/raft/RaftEventSimulationTest.java | 31 ++++++++----
8 files changed, 74 insertions(+), 54 deletions(-)
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 51e95d7..23c8f245 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -235,16 +235,15 @@ final class KafkaMetadataLog private (
}
override def createNewSnapshot(snapshotId: OffsetAndEpoch):
Optional[RawSnapshotWriter] = {
- val highWatermarkOffset = highWatermark.offset
- if (snapshotId.offset > highWatermarkOffset) {
- throw new IllegalArgumentException(
- s"Cannot create a snapshot for an end offset ($endOffset) greater than
the high-watermark ($highWatermarkOffset)"
- )
+ if (snapshotId.offset < startOffset) {
+ info(s"Cannot create a snapshot with an id ($snapshotId) less than the
log start offset ($startOffset)")
+ return Optional.empty()
}
- if (snapshotId.offset < startOffset) {
+ val highWatermarkOffset = highWatermark.offset
+ if (snapshotId.offset > highWatermarkOffset) {
throw new IllegalArgumentException(
- s"Cannot create a snapshot for an end offset ($endOffset) less than
the log start offset ($startOffset)"
+ s"Cannot create a snapshot with an id ($snapshotId) greater than the
high-watermark ($highWatermarkOffset)"
)
}
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index 5fb4b12..415b694 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -193,10 +193,7 @@ final class KafkaMetadataLogTest {
// Simulate log cleanup that advances the LSO
log.log.maybeIncrementLogStartOffset(snapshotId.offset - 1,
SegmentDeletion)
- assertThrows(
- classOf[IllegalArgumentException],
- () => log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset - 2,
snapshotId.epoch))
- )
+ assertEquals(Optional.empty(), log.createNewSnapshot(new
OffsetAndEpoch(snapshotId.offset - 2, snapshotId.epoch)))
}
@Test
diff --git a/raft/config/kraft-log4j.properties
b/raft/config/kraft-log4j.properties
index 87f4fcc..14f739a 100644
--- a/raft/config/kraft-log4j.properties
+++ b/raft/config/kraft-log4j.properties
@@ -21,3 +21,4 @@ log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m
(%c)%n
log4j.appender.stderr.Target=System.err
log4j.logger.org.apache.kafka.raft=INFO
+log4j.logger.org.apache.kafka.snapshot=INFO
diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
index 1b49b3a..e8b2957 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
@@ -236,17 +236,17 @@ public interface ReplicatedLog extends AutoCloseable {
*
* See {@link RawSnapshotWriter} for details on how to use this object.
The caller of
* this method is responsible for invoking {@link
RawSnapshotWriter#close()}. If a
- * snapshot already exists then return an {@link Optional#empty()}.
+ * snapshot already exists or it is less than log start offset then return
an
+ * {@link Optional#empty()}.
*
* Snapshots created using this method will be validated against the
existing snapshots
* and the replicated log.
*
* @param snapshotId the end offset and epoch that identifies the snapshot
- * @return a writable snapshot if it doesn't already exists
+ * @return a writable snapshot if it doesn't already exists and greater
than the log start
+ * offset
* @throws IllegalArgumentException if validate is true and end offset is
greater than the
* high-watermark
- * @throws IllegalArgumentException if validate is true and end offset is
less than the log
- * start offset
*/
Optional<RawSnapshotWriter> createNewSnapshot(OffsetAndEpoch snapshotId);
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index fb2168b..726f184 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -27,11 +27,13 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.snapshot.RawSnapshotReader;
-import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.MockRawSnapshotReader;
import org.apache.kafka.snapshot.MockRawSnapshotWriter;
+import org.apache.kafka.snapshot.RawSnapshotReader;
+import org.apache.kafka.snapshot.RawSnapshotWriter;
+import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -59,14 +61,20 @@ public class MockLog implements ReplicatedLog {
private final NavigableMap<OffsetAndEpoch, MockRawSnapshotReader>
snapshots = new TreeMap<>();
private final TopicPartition topicPartition;
private final Uuid topicId;
+ private final Logger logger;
private long nextId = ID_GENERATOR.getAndIncrement();
private LogOffsetMetadata highWatermark = new LogOffsetMetadata(0,
Optional.empty());
private long lastFlushedOffset = 0;
- public MockLog(TopicPartition topicPartition, Uuid topicId) {
+ public MockLog(
+ TopicPartition topicPartition,
+ Uuid topicId,
+ LogContext logContext
+ ) {
this.topicPartition = topicPartition;
this.topicId = topicId;
+ this.logger = logContext.logger(MockLog.class);
}
@Override
@@ -218,17 +226,25 @@ public class MockLog implements ReplicatedLog {
@Override
public LogOffsetMetadata endOffset() {
- long nextOffset = lastEntry().map(entry -> entry.offset +
1).orElse(logStartOffset());
+ long nextOffset = lastEntry()
+ .map(entry -> entry.offset + 1)
+ .orElse(
+ latestSnapshotId()
+ .map(id -> id.offset)
+ .orElse(0L)
+ );
return new LogOffsetMetadata(nextOffset, Optional.of(new
MockOffsetMetadata(nextId)));
}
@Override
public long startOffset() {
- return firstEntry().map(entry ->
entry.offset).orElse(logStartOffset());
- }
-
- private long logStartOffset() {
- return earliestSnapshotId().map(id -> id.offset).orElse(0L);
+ return firstEntry()
+ .map(entry -> entry.offset)
+ .orElse(
+ earliestSnapshotId()
+ .map(id -> id.offset)
+ .orElse(0L)
+ );
}
private List<LogEntry> buildEntries(RecordBatch batch, Function<Record,
Long> offsetSupplier) {
@@ -420,6 +436,16 @@ public class MockLog implements ReplicatedLog {
@Override
public Optional<RawSnapshotWriter> createNewSnapshot(OffsetAndEpoch
snapshotId) {
+ if (snapshotId.offset < startOffset()) {
+ logger.info(
+ "Cannot create a snapshot with an id ({}) less than the log
start offset ({})",
+ snapshotId,
+ startOffset()
+ );
+
+ return Optional.empty();
+ }
+
long highWatermarkOffset = highWatermark().offset;
if (snapshotId.offset > highWatermarkOffset) {
throw new IllegalArgumentException(
@@ -431,16 +457,6 @@ public class MockLog implements ReplicatedLog {
);
}
- if (snapshotId.offset < logStartOffset()) {
- throw new IllegalArgumentException(
- String.format(
- "Cannot create a snapshot with and id (%s) less than the
log start offset (%s)",
- snapshotId,
- logStartOffset()
- )
- );
- }
-
ValidOffsetAndEpoch validOffsetAndEpoch =
validateOffsetAndEpoch(snapshotId.offset, snapshotId.epoch);
if (validOffsetAndEpoch.kind() != ValidOffsetAndEpoch.Kind.VALID) {
throw new IllegalArgumentException(
@@ -490,12 +506,12 @@ public class MockLog implements ReplicatedLog {
@Override
public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) {
- if (logStartOffset() > snapshotId.offset) {
+ if (startOffset() > snapshotId.offset) {
throw new OffsetOutOfRangeException(
String.format(
"New log start (%s) is less than the curent log start
offset (%s)",
snapshotId,
- logStartOffset()
+ startOffset()
)
);
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index 404c8d46..1b2caca 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
@@ -57,7 +58,7 @@ public class MockLogTest {
@BeforeEach
public void setup() {
- log = new MockLog(topicPartition, topicId);
+ log = new MockLog(topicPartition, topicId, new LogContext());
}
@AfterEach
@@ -510,10 +511,7 @@ public class MockLogTest {
assertTrue(log.deleteBeforeSnapshot(snapshotId));
assertEquals(snapshotId.offset, log.startOffset());
- assertThrows(
- IllegalArgumentException.class,
- () -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords -
1, epoch))
- );
+ assertEquals(Optional.empty(), log.createNewSnapshot(new
OffsetAndEpoch(numberOfRecords - 1, epoch)));
}
@Test
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 2f644e4..7bffeb8 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -128,7 +128,8 @@ public final class RaftClientTestContext {
private final MockTime time = new MockTime();
private final QuorumStateStore quorumStateStore = new
MockQuorumStateStore();
private final Random random = Mockito.spy(new Random(1));
- private final MockLog log = new MockLog(METADATA_PARTITION,
Uuid.METADATA_TOPIC_ID);
+ private final LogContext logContext = new LogContext();
+ private final MockLog log = new MockLog(METADATA_PARTITION,
Uuid.METADATA_TOPIC_ID, logContext);
private final Set<Integer> voters;
private final OptionalInt localId;
@@ -222,7 +223,6 @@ public final class RaftClientTestContext {
public RaftClientTestContext build() throws IOException {
Metrics metrics = new Metrics(time);
MockNetworkChannel channel = new MockNetworkChannel(voters);
- LogContext logContext = new LogContext();
MockListener listener = new MockListener(localId);
Map<Integer, RaftConfig.AddressSpec> voterAddressMap =
voters.stream()
.collect(Collectors.toMap(id -> id,
RaftClientTestContext::mockAddress));
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index 739785a..d69fcc1 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.raft;
+import net.jqwik.api.AfterFailureMode;
import net.jqwik.api.ForAll;
import net.jqwik.api.Property;
import net.jqwik.api.Tag;
@@ -103,7 +104,7 @@ public class RaftEventSimulationTest {
private static final int FETCH_MAX_WAIT_MS = 100;
private static final int LINGER_MS = 0;
- @Property(tries = 100)
+ @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canElectInitialLeader(
@ForAll int seed,
@ForAll @IntRange(min = 1, max = 5) int numVoters,
@@ -122,7 +123,7 @@ public class RaftEventSimulationTest {
scheduler.runUntil(() -> cluster.allReachedHighWatermark(10));
}
- @Property(tries = 100)
+ @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canElectNewLeaderAfterOldLeaderFailure(
@ForAll int seed,
@ForAll @IntRange(min = 3, max = 5) int numVoters,
@@ -162,7 +163,7 @@ public class RaftEventSimulationTest {
scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark
+ 10));
}
- @Property(tries = 100)
+ @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canRecoverAfterAllNodesKilled(
@ForAll int seed,
@ForAll @IntRange(min = 1, max = 5) int numVoters,
@@ -195,7 +196,7 @@ public class RaftEventSimulationTest {
scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark
+ 10));
}
- @Property(tries = 100)
+ @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canElectNewLeaderAfterOldLeaderPartitionedAway(
@ForAll int seed,
@ForAll @IntRange(min = 3, max = 5) int numVoters,
@@ -227,7 +228,7 @@ public class RaftEventSimulationTest {
scheduler.runUntil(() -> cluster.allReachedHighWatermark(20,
nonPartitionedNodes));
}
- @Property(tries = 100)
+ @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canMakeProgressIfMajorityIsReachable(
@ForAll int seed,
@ForAll @IntRange(min = 0, max = 3) int numObservers
@@ -272,7 +273,7 @@ public class RaftEventSimulationTest {
scheduler.runUntil(() -> cluster.allReachedHighWatermark(30));
}
- @Property(tries = 100)
+ @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canMakeProgressAfterBackToBackLeaderFailures(
@ForAll int seed,
@ForAll @IntRange(min = 3, max = 5) int numVoters,
@@ -305,7 +306,7 @@ public class RaftEventSimulationTest {
scheduler.runUntil(() ->
cluster.anyReachedHighWatermark(targetHighWatermark));
}
- @Property(tries = 100)
+ @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canRecoverFromSingleNodeCommittedDataLoss(
@ForAll int seed,
@ForAll @IntRange(min = 3, max = 5) int numVoters,
@@ -498,7 +499,15 @@ public class RaftEventSimulationTest {
private static class PersistentState {
final MockQuorumStateStore store = new MockQuorumStateStore();
- final MockLog log = new MockLog(METADATA_PARTITION,
Uuid.METADATA_TOPIC_ID);
+ final MockLog log;
+
+ PersistentState(int nodeId) {
+ log = new MockLog(
+ METADATA_PARTITION,
+ Uuid.METADATA_TOPIC_ID,
+ new LogContext(String.format("[Node %s] ", nodeId))
+ );
+ }
}
private static class Cluster {
@@ -516,11 +525,11 @@ public class RaftEventSimulationTest {
int nodeId = 0;
for (; nodeId < numVoters; nodeId++) {
voters.add(nodeId);
- nodes.put(nodeId, new PersistentState());
+ nodes.put(nodeId, new PersistentState(nodeId));
}
for (; nodeId < numVoters + numObservers; nodeId++) {
- nodes.put(nodeId, new PersistentState());
+ nodes.put(nodeId, new PersistentState(nodeId));
}
}
@@ -674,7 +683,7 @@ public class RaftEventSimulationTest {
void killAndDeletePersistentState(int nodeId) {
kill(nodeId);
- nodes.put(nodeId, new PersistentState());
+ nodes.put(nodeId, new PersistentState(nodeId));
}
private static RaftConfig.AddressSpec nodeAddress(int id) {