This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 14d9aaa KAFKA-13113; Support unregistering Raft listeners (#11109)
14d9aaa is described below
commit 14d9aaaca678eb0d8c35494321448f1283768cbe
Author: José Armando García Sancio <[email protected]>
AuthorDate: Fri Jul 23 21:54:44 2021 -0700
KAFKA-13113; Support unregistering Raft listeners (#11109)
This patch adds support for unregistering listeners to `RaftClient`.
Reviewers: Colin P. McCabe <[email protected]>, Jason Gustafson
<[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../org/apache/kafka/metalog/LocalLogManager.java | 35 +++++--
.../org/apache/kafka/raft/KafkaRaftClient.java | 108 ++++++++++++++++-----
.../java/org/apache/kafka/raft/RaftClient.java | 23 ++++-
.../org/apache/kafka/raft/ValidOffsetAndEpoch.java | 2 +-
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 42 +++++++-
.../java/org/apache/kafka/raft/MockLogTest.java | 29 +++---
7 files changed, 188 insertions(+), 53 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 6b93eed..b7a7192 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -71,7 +71,7 @@
files="(ConsumerCoordinator|Fetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator).java"/>
<suppress checks="JavaNCSS"
-
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
+
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
<suppress checks="NPathComplexity"
files="(ConsumerCoordinator|BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer).java"/>
diff --git
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index cfb4221..a5e3b2c 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -43,12 +43,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
@@ -418,7 +419,7 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
/**
* The listener objects attached to this local log manager.
*/
- private final List<MetaLogListenerData> listeners = new ArrayList<>();
+ private final Map<Listener<ApiMessageAndVersion>, MetaLogListenerData>
listeners = new IdentityHashMap<>();
/**
* The current leader, as seen by this log manager.
@@ -441,7 +442,7 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
try {
log.debug("Node {}: running log check.", nodeId);
int numEntriesFound = 0;
- for (MetaLogListenerData listenerData : listeners) {
+ for (MetaLogListenerData listenerData : listeners.values()) {
while (true) {
// Load the snapshot if needed and we are not the
leader
LeaderAndEpoch notifiedLeader =
listenerData.notifiedLeader();
@@ -526,7 +527,7 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
if (initialized && !shutdown) {
log.debug("Node {}: beginning shutdown.", nodeId);
resign(leader.epoch());
- for (MetaLogListenerData listenerData : listeners) {
+ for (MetaLogListenerData listenerData :
listeners.values()) {
listenerData.beginShutdown();
}
shared.unregisterLogManager(this);
@@ -586,8 +587,12 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
"already been shut down.", nodeId);
future.complete(null);
} else if (initialized) {
- log.info("Node {}: registered MetaLogListener.", nodeId);
- listeners.add(new MetaLogListenerData(listener));
+ int id = System.identityHashCode(listener);
+ if (listeners.putIfAbsent(listener, new
MetaLogListenerData(listener)) != null) {
+ log.error("Node {}: can't register because listener {}
already exists", nodeId, id);
+ } else {
+ log.info("Node {}: registered MetaLogListener {}", nodeId,
id);
+ }
shared.electLeaderIfNeeded();
scheduleLogCheck();
future.complete(null);
@@ -609,6 +614,22 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
}
@Override
+ public void unregister(RaftClient.Listener<ApiMessageAndVersion> listener)
{
+ eventQueue.append(() -> {
+ if (shutdown) {
+ log.info("Node {}: can't unregister because local log manager
is shutdown", nodeId);
+ } else {
+ int id = System.identityHashCode(listener);
+ if (listeners.remove(listener) == null) {
+ log.error("Node {}: can't unregister because the listener
{} doesn't exists", nodeId, id);
+ } else {
+ log.info("Node {}: unregistered MetaLogListener {}",
nodeId, id);
+ }
+ }
+ });
+ }
+
+ @Override
public Long scheduleAppend(int epoch, List<ApiMessageAndVersion> batch) {
return scheduleAtomicAppend(epoch, batch);
}
@@ -664,7 +685,7 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
public List<RaftClient.Listener<ApiMessageAndVersion>> listeners() {
final
CompletableFuture<List<RaftClient.Listener<ApiMessageAndVersion>>> future = new
CompletableFuture<>();
eventQueue.append(() -> {
- future.complete(listeners.stream().map(l ->
l.listener).collect(Collectors.toList()));
+ future.complete(listeners.values().stream().map(l ->
l.listener).collect(Collectors.toList()));
});
try {
return future.get();
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index ffc43a7..8ea7daf 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -75,8 +75,8 @@ import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.slf4j.Logger;
-import java.util.ArrayList;
import java.util.Collections;
+import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -162,8 +162,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private final RequestManager requestManager;
private final RaftMetadataLogCleanerManager snapshotCleaner;
- private final List<ListenerContext> listenerContexts = new ArrayList<>();
- private final ConcurrentLinkedQueue<Listener<T>> pendingListeners = new
ConcurrentLinkedQueue<>();
+ private final Map<Listener<T>, ListenerContext> listenerContexts = new
IdentityHashMap<>();
+ private final ConcurrentLinkedQueue<Registration<T>> pendingRegistrations
= new ConcurrentLinkedQueue<>();
/**
* Create a new instance.
@@ -302,7 +302,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
private void updateListenersProgress(long highWatermark) {
- for (ListenerContext listenerContext : listenerContexts) {
+ for (ListenerContext listenerContext : listenerContexts.values()) {
listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset
-> {
if (nextExpectedOffset < log.startOffset() &&
nextExpectedOffset < highWatermark) {
SnapshotReader<T> snapshot =
latestSnapshot().orElseThrow(() -> new IllegalStateException(
@@ -335,7 +335,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
private void maybeFireHandleCommit(long baseOffset, int epoch, long
appendTimestamp, int sizeInBytes, List<T> records) {
- for (ListenerContext listenerContext : listenerContexts) {
+ for (ListenerContext listenerContext : listenerContexts.values()) {
listenerContext.nextExpectedOffset().ifPresent(nextOffset -> {
if (nextOffset == baseOffset) {
listenerContext.fireHandleCommit(baseOffset, epoch,
appendTimestamp, sizeInBytes, records);
@@ -345,13 +345,13 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
private void maybeFireLeaderChange(LeaderState<T> state) {
- for (ListenerContext listenerContext : listenerContexts) {
+ for (ListenerContext listenerContext : listenerContexts.values()) {
listenerContext.maybeFireLeaderChange(quorum.leaderAndEpoch(),
state.epochStartOffset());
}
}
private void maybeFireLeaderChange() {
- for (ListenerContext listenerContext : listenerContexts) {
+ for (ListenerContext listenerContext : listenerContexts.values()) {
listenerContext.maybeFireLeaderChange(quorum.leaderAndEpoch());
}
}
@@ -380,11 +380,18 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
@Override
public void register(Listener<T> listener) {
- pendingListeners.add(listener);
+ pendingRegistrations.add(Registration.register(listener));
wakeup();
}
@Override
+ public void unregister(Listener<T> listener) {
+ pendingRegistrations.add(Registration.unregister(listener));
+ // No need to wakeup the polling thread. It is a removal so the
updates can be
+ // delayed until the polling thread wakes up for other reasons.
+ }
+
+ @Override
public LeaderAndEpoch leaderAndEpoch() {
return quorum.leaderAndEpoch();
}
@@ -1021,6 +1028,10 @@ public class KafkaRaftClient<T> implements RaftClient<T>
{
return OptionalInt.of(leaderIdOrNil);
}
+ private static String listenerName(Listener<?> listener) {
+ return String.format("%s@%s", listener.getClass().getTypeName(),
System.identityHashCode(listener));
+ }
+
private boolean handleFetchResponse(
RaftResponse.Inbound responseMetadata,
long currentTimeMs
@@ -2102,10 +2113,14 @@ public class KafkaRaftClient<T> implements
RaftClient<T> {
}
private void pollListeners() {
- // Register any listeners added since the last poll
- while (!pendingListeners.isEmpty()) {
- Listener<T> listener = pendingListeners.poll();
- listenerContexts.add(new ListenerContext(listener));
+ // Apply all of the pending registration
+ while (true) {
+ Registration<T> registration = pendingRegistrations.poll();
+ if (registration == null) {
+ break;
+ }
+
+ processRegistration(registration);
}
// Check listener progress to see if reads are expected
@@ -2114,6 +2129,25 @@ public class KafkaRaftClient<T> implements RaftClient<T>
{
});
}
+ private void processRegistration(Registration<T> registration) {
+ Listener<T> listener = registration.listener();
+ Registration.Ops ops = registration.ops();
+
+ if (ops == Registration.Ops.REGISTER) {
+ if (listenerContexts.putIfAbsent(listener, new
ListenerContext(listener)) != null) {
+ logger.error("Attempting to add a listener that already
exists: {}", listenerName(listener));
+ } else {
+ logger.info("Registered the listener {}",
listenerName(listener));
+ }
+ } else {
+ if (listenerContexts.remove(listener) == null) {
+ logger.error("Attempting to remove a listener that doesn't
exists: {}", listenerName(listener));
+ } else {
+ logger.info("Unregistered the listener {}",
listenerName(listener));
+ }
+ }
+ }
+
private boolean maybeCompleteShutdown(long currentTimeMs) {
GracefulShutdown shutdown = this.shutdown.get();
if (shutdown == null) {
@@ -2377,13 +2411,43 @@ public class KafkaRaftClient<T> implements
RaftClient<T> {
}
}
+ private static final class Registration<T> {
+ private final Ops ops;
+ private final Listener<T> listener;
+
+ private Registration(Ops ops, Listener<T> listener) {
+ this.ops = ops;
+ this.listener = listener;
+ }
+
+ private Ops ops() {
+ return ops;
+ }
+
+ private Listener<T> listener() {
+ return listener;
+ }
+
+ private static enum Ops {
+ REGISTER, UNREGISTER
+ }
+
+ private static <T> Registration<T> register(Listener<T> listener) {
+ return new Registration<>(Ops.REGISTER, listener);
+ }
+
+ private static <T> Registration<T> unregister(Listener<T> listener) {
+ return new Registration<>(Ops.UNREGISTER, listener);
+ }
+ }
+
private final class ListenerContext implements
CloseListener<BatchReader<T>> {
private final RaftClient.Listener<T> listener;
// This field is used only by the Raft IO thread
private LeaderAndEpoch lastFiredLeaderChange = new
LeaderAndEpoch(OptionalInt.empty(), 0);
// These fields are visible to both the Raft IO thread and the listener
- // and are protected through synchronization on this `ListenerContext`
instance
+ // and are protected through synchronization on this ListenerContext
instance
private BatchReader<T> lastSent = null;
private long nextOffset = 0;
@@ -2395,7 +2459,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
* Get the last acked offset, which is one greater than the offset of
the
* last record which was acked by the state machine.
*/
- public synchronized long nextOffset() {
+ private synchronized long nextOffset() {
return nextOffset;
}
@@ -2407,7 +2471,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
* we delay sending additional data until the state machine has read
to the
* end and the last offset is determined.
*/
- public synchronized OptionalLong nextExpectedOffset() {
+ private synchronized OptionalLong nextExpectedOffset() {
if (lastSent != null) {
OptionalLong lastSentOffset = lastSent.lastOffset();
if (lastSentOffset.isPresent()) {
@@ -2424,7 +2488,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
* This API is used when the Listener needs to be notified of a new
snapshot. This happens
* when the context's next offset is less than the log start offset.
*/
- public void fireHandleSnapshot(SnapshotReader<T> reader) {
+ private void fireHandleSnapshot(SnapshotReader<T> reader) {
synchronized (this) {
nextOffset = reader.snapshotId().offset;
lastSent = null;
@@ -2440,7 +2504,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
* know whether it has been committed. Rather than retaining the
uncommitted
* data in memory, we let the state machine read the records from disk.
*/
- public void fireHandleCommit(long baseOffset, Records records) {
+ private void fireHandleCommit(long baseOffset, Records records) {
fireHandleCommit(
RecordsBatchReader.of(
baseOffset,
@@ -2460,7 +2524,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
* a nice optimization for the leader which is typically doing more
work than all of the
* followers.
*/
- public void fireHandleCommit(
+ private void fireHandleCommit(
long baseOffset,
int epoch,
long appendTimestamp,
@@ -2472,8 +2536,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
fireHandleCommit(reader);
}
- public String listenerName() {
- return listener.getClass().getTypeName();
+ private String listenerName() {
+ return KafkaRaftClient.listenerName(listener);
}
private void fireHandleCommit(BatchReader<T> reader) {
@@ -2489,7 +2553,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
listener.handleCommit(reader);
}
- void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
+ private void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
if (shouldFireLeaderChange(leaderAndEpoch)) {
lastFiredLeaderChange = leaderAndEpoch;
logger.debug("Notifying listener {} of leader change {}",
listenerName(), leaderAndEpoch);
@@ -2508,7 +2572,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
}
- void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch, long
epochStartOffset) {
+ private void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch, long
epochStartOffset) {
// If this node is becoming the leader, then we can fire
`handleClaim` as soon
// as the listener has caught up to the start of the leader epoch.
This guarantees
// that the state machine has seen the full committed state before
it becomes
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index e0d5eaa..46cd292 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -89,15 +89,32 @@ public interface RaftClient<T> extends AutoCloseable {
void initialize();
/**
- * Register a listener to get commit/leader notifications.
+ * Register a listener to get commit, snapshot and leader notifications.
*
- * @param listener the listener
+ * The implementation of this interface assumes that each call to {@code
register} uses
+ * a different {@code Listener} instance. If the same instance is used for
multiple calls
+ * to this method, then only one {@code Listener} will be registered.
+ *
+ * @param listener the listener to register
*/
void register(Listener<T> listener);
/**
+ * Unregisters a listener.
+ *
+ * To distinguish from events that happend before the call to {@code
unregister} and a future
+ * call to {@code register}, different {@code Listener} instances must be
used.
+ *
+ * If the {@code Listener} provided was never registered then the
unregistration is ignored.
+ *
+ * @param listener the listener to unregister
+ */
+ void unregister(Listener<T> listener);
+
+ /**
* Return the current {@link LeaderAndEpoch}.
- * @return the current {@link LeaderAndEpoch}
+ *
+ * @return the current leader and epoch
*/
LeaderAndEpoch leaderAndEpoch();
diff --git a/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
b/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
index 893b390..320e3d9 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
@@ -22,7 +22,7 @@ public final class ValidOffsetAndEpoch {
final private Kind kind;
final private OffsetAndEpoch offsetAndEpoch;
- ValidOffsetAndEpoch(Kind kind, OffsetAndEpoch offsetAndEpoch) {
+ private ValidOffsetAndEpoch(Kind kind, OffsetAndEpoch offsetAndEpoch) {
this.kind = kind;
this.offsetAndEpoch = offsetAndEpoch;
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index a70ad93..d85714b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -2492,6 +2492,47 @@ public class KafkaRaftClientTest {
}
@Test
+ public void testReregistrationChangesListenerContext() throws Exception {
+ int localId = 0;
+ int otherNodeId = 1;
+ int epoch = 5;
+ Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+ List<String> batch1 = Arrays.asList("1", "2", "3");
+ List<String> batch2 = Arrays.asList("4", "5", "6");
+ List<String> batch3 = Arrays.asList("7", "8", "9");
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .appendToLog(1, batch1)
+ .appendToLog(1, batch2)
+ .appendToLog(2, batch3)
+ .withUnknownLeader(epoch - 1)
+ .build();
+
+ context.becomeLeader();
+ context.client.poll();
+ assertEquals(10L, context.log.endOffset().offset);
+
+ // Let the initial listener catch up
+ context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+ context.pollUntil(() ->
OptionalLong.of(8).equals(context.listener.lastCommitOffset()));
+
+ // Register a second listener
+ RaftClientTestContext.MockListener secondListener = new
RaftClientTestContext.MockListener(OptionalInt.of(localId));
+ context.client.register(secondListener);
+ context.pollUntil(() ->
OptionalLong.of(8).equals(secondListener.lastCommitOffset()));
+ context.client.unregister(secondListener);
+
+ // Write to the log and show that the default listener gets updated...
+ assertEquals(10L, context.client.scheduleAppend(epoch,
singletonList("a")));
+ context.client.poll();
+ context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+ context.pollUntil(() ->
OptionalLong.of(10).equals(context.listener.lastCommitOffset()));
+ // ... but unregister listener doesn't
+ assertEquals(OptionalLong.of(8), secondListener.lastCommitOffset());
+ }
+
+ @Test
public void
testHandleCommitCallbackFiresAfterFollowerHighWatermarkAdvances() throws
Exception {
int localId = 0;
int otherNodeId = 1;
@@ -2677,5 +2718,4 @@ public class KafkaRaftClientTest {
private static KafkaMetric getMetric(final Metrics metrics, final String
name) {
return metrics.metrics().get(metrics.metricName(name, "raft-metrics"));
}
-
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index 1b2caca..9365640 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -766,8 +766,8 @@ public class MockLogTest {
appendBatch(numberOfRecords, epoch);
ValidOffsetAndEpoch resultOffsetAndEpoch =
log.validateOffsetAndEpoch(numberOfRecords, epoch + 1);
- assertEquals(new
ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new
OffsetAndEpoch(log.endOffset().offset, epoch)),
- resultOffsetAndEpoch);
+ assertEquals(ValidOffsetAndEpoch.diverging(new
OffsetAndEpoch(log.endOffset().offset, epoch)),
+ resultOffsetAndEpoch);
}
@Test
@@ -782,8 +782,7 @@ public class MockLogTest {
log.truncateToLatestSnapshot();
ValidOffsetAndEpoch resultOffsetAndEpoch =
log.validateOffsetAndEpoch(offset, epoch - 1);
- assertEquals(new
ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.SNAPSHOT, olderEpochSnapshotId),
- resultOffsetAndEpoch);
+ assertEquals(ValidOffsetAndEpoch.snapshot(olderEpochSnapshotId),
resultOffsetAndEpoch);
}
@Test
@@ -798,8 +797,7 @@ public class MockLogTest {
log.truncateToLatestSnapshot();
ValidOffsetAndEpoch resultOffsetAndEpoch =
log.validateOffsetAndEpoch(offset - 1, epoch);
- assertEquals(new
ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.SNAPSHOT, olderEpochSnapshotId),
- resultOffsetAndEpoch);
+ assertEquals(ValidOffsetAndEpoch.snapshot(olderEpochSnapshotId),
resultOffsetAndEpoch);
}
@Test
@@ -814,8 +812,7 @@ public class MockLogTest {
log.truncateToLatestSnapshot();
ValidOffsetAndEpoch resultOffsetAndEpoch =
log.validateOffsetAndEpoch(offset, epoch);
- assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.VALID,
olderEpochSnapshotId),
- resultOffsetAndEpoch);
+ assertEquals(ValidOffsetAndEpoch.Kind.VALID,
resultOffsetAndEpoch.kind());
}
@Test
@@ -835,8 +832,7 @@ public class MockLogTest {
// offset is not equal to oldest snapshot's offset
ValidOffsetAndEpoch resultOffsetAndEpoch =
log.validateOffsetAndEpoch(100, 3);
- assertEquals(new
ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new OffsetAndEpoch(20,
2)),
- resultOffsetAndEpoch);
+ assertEquals(ValidOffsetAndEpoch.diverging(new OffsetAndEpoch(20, 2)),
resultOffsetAndEpoch);
}
@Test
@@ -854,8 +850,7 @@ public class MockLogTest {
// offset is not equal to oldest snapshot's offset
ValidOffsetAndEpoch resultOffsetAndEpoch =
log.validateOffsetAndEpoch(100, 2);
- assertEquals(new
ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, olderEpochSnapshotId),
- resultOffsetAndEpoch);
+ assertEquals(ValidOffsetAndEpoch.diverging(olderEpochSnapshotId),
resultOffsetAndEpoch);
}
@Test
@@ -866,8 +861,8 @@ public class MockLogTest {
appendBatch(numberOfRecords, epoch);
ValidOffsetAndEpoch resultOffsetAndEpoch =
log.validateOffsetAndEpoch(numberOfRecords + 1, epoch);
- assertEquals(new
ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new
OffsetAndEpoch(log.endOffset().offset, epoch)),
- resultOffsetAndEpoch);
+ assertEquals(ValidOffsetAndEpoch.diverging(new
OffsetAndEpoch(log.endOffset().offset, epoch)),
+ resultOffsetAndEpoch);
}
@Test
@@ -879,8 +874,7 @@ public class MockLogTest {
appendBatch(numberOfRecords, epoch + 1);
ValidOffsetAndEpoch resultOffsetAndEpoch =
log.validateOffsetAndEpoch(11, epoch);
- assertEquals(new
ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new OffsetAndEpoch(10,
epoch)),
- resultOffsetAndEpoch);
+ assertEquals(ValidOffsetAndEpoch.diverging(new OffsetAndEpoch(10,
epoch)), resultOffsetAndEpoch);
}
@Test
@@ -891,8 +885,7 @@ public class MockLogTest {
appendBatch(numberOfRecords, epoch);
ValidOffsetAndEpoch resultOffsetAndEpoch =
log.validateOffsetAndEpoch(numberOfRecords - 1, epoch);
- assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.VALID,
new OffsetAndEpoch(numberOfRecords - 1, epoch)),
- resultOffsetAndEpoch);
+ assertEquals(ValidOffsetAndEpoch.Kind.VALID,
resultOffsetAndEpoch.kind());
}
private Optional<OffsetRange> readOffsets(long startOffset, Isolation
isolation) {