This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 14d9aaa  KAFKA-13113; Support unregistering Raft listeners (#11109)
14d9aaa is described below

commit 14d9aaaca678eb0d8c35494321448f1283768cbe
Author: José Armando García Sancio <[email protected]>
AuthorDate: Fri Jul 23 21:54:44 2021 -0700

    KAFKA-13113; Support unregistering Raft listeners (#11109)
    
    This patch adds support for unregistering listeners to `RaftClient`.
    
    Reviewers: Colin P. McCabe <[email protected]>, Jason Gustafson 
<[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../org/apache/kafka/metalog/LocalLogManager.java  |  35 +++++--
 .../org/apache/kafka/raft/KafkaRaftClient.java     | 108 ++++++++++++++++-----
 .../java/org/apache/kafka/raft/RaftClient.java     |  23 ++++-
 .../org/apache/kafka/raft/ValidOffsetAndEpoch.java |   2 +-
 .../org/apache/kafka/raft/KafkaRaftClientTest.java |  42 +++++++-
 .../java/org/apache/kafka/raft/MockLogTest.java    |  29 +++---
 7 files changed, 188 insertions(+), 53 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 6b93eed..b7a7192 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -71,7 +71,7 @@
               
files="(ConsumerCoordinator|Fetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator).java"/>
 
     <suppress checks="JavaNCSS"
-              
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
+              
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
 
     <suppress checks="NPathComplexity"
               
files="(ConsumerCoordinator|BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer).java"/>
diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java 
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index cfb4221..a5e3b2c 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -43,12 +43,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Objects;
 import java.util.Optional;
@@ -418,7 +419,7 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
     /**
      * The listener objects attached to this local log manager.
      */
-    private final List<MetaLogListenerData> listeners = new ArrayList<>();
+    private final Map<Listener<ApiMessageAndVersion>, MetaLogListenerData> 
listeners = new IdentityHashMap<>();
 
     /**
      * The current leader, as seen by this log manager.
@@ -441,7 +442,7 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
             try {
                 log.debug("Node {}: running log check.", nodeId);
                 int numEntriesFound = 0;
-                for (MetaLogListenerData listenerData : listeners) {
+                for (MetaLogListenerData listenerData : listeners.values()) {
                     while (true) {
                         // Load the snapshot if needed and we are not the 
leader
                         LeaderAndEpoch notifiedLeader = 
listenerData.notifiedLeader();
@@ -526,7 +527,7 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
                 if (initialized && !shutdown) {
                     log.debug("Node {}: beginning shutdown.", nodeId);
                     resign(leader.epoch());
-                    for (MetaLogListenerData listenerData : listeners) {
+                    for (MetaLogListenerData listenerData : 
listeners.values()) {
                         listenerData.beginShutdown();
                     }
                     shared.unregisterLogManager(this);
@@ -586,8 +587,12 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
                     "already been shut down.", nodeId);
                 future.complete(null);
             } else if (initialized) {
-                log.info("Node {}: registered MetaLogListener.", nodeId);
-                listeners.add(new MetaLogListenerData(listener));
+                int id = System.identityHashCode(listener);
+                if (listeners.putIfAbsent(listener, new 
MetaLogListenerData(listener)) != null) {
+                    log.error("Node {}: can't register because listener {} 
already exists", nodeId, id);
+                } else {
+                    log.info("Node {}: registered MetaLogListener {}", nodeId, 
id);
+                }
                 shared.electLeaderIfNeeded();
                 scheduleLogCheck();
                 future.complete(null);
@@ -609,6 +614,22 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
     }
 
     @Override
+    public void unregister(RaftClient.Listener<ApiMessageAndVersion> listener) 
{
+        eventQueue.append(() -> {
+            if (shutdown) {
+                log.info("Node {}: can't unregister because local log manager 
is shutdown", nodeId);
+            } else {
+                int id = System.identityHashCode(listener);
+                if (listeners.remove(listener) == null) {
+                    log.error("Node {}: can't unregister because the listener 
{} doesn't exists", nodeId, id);
+                } else {
+                    log.info("Node {}: unregistered MetaLogListener {}", 
nodeId, id);
+                }
+            }
+        });
+    }
+
+    @Override
     public Long scheduleAppend(int epoch, List<ApiMessageAndVersion> batch) {
         return scheduleAtomicAppend(epoch, batch);
     }
@@ -664,7 +685,7 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
     public List<RaftClient.Listener<ApiMessageAndVersion>> listeners() {
         final 
CompletableFuture<List<RaftClient.Listener<ApiMessageAndVersion>>> future = new 
CompletableFuture<>();
         eventQueue.append(() -> {
-            future.complete(listeners.stream().map(l -> 
l.listener).collect(Collectors.toList()));
+            future.complete(listeners.values().stream().map(l -> 
l.listener).collect(Collectors.toList()));
         });
         try {
             return future.get();
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index ffc43a7..8ea7daf 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -75,8 +75,8 @@ import org.apache.kafka.snapshot.SnapshotReader;
 import org.apache.kafka.snapshot.SnapshotWriter;
 import org.slf4j.Logger;
 
-import java.util.ArrayList;
 import java.util.Collections;
+import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -162,8 +162,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
     private final RequestManager requestManager;
     private final RaftMetadataLogCleanerManager snapshotCleaner;
 
-    private final List<ListenerContext> listenerContexts = new ArrayList<>();
-    private final ConcurrentLinkedQueue<Listener<T>> pendingListeners = new 
ConcurrentLinkedQueue<>();
+    private final Map<Listener<T>, ListenerContext> listenerContexts = new 
IdentityHashMap<>();
+    private final ConcurrentLinkedQueue<Registration<T>> pendingRegistrations 
= new ConcurrentLinkedQueue<>();
 
     /**
      * Create a new instance.
@@ -302,7 +302,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
     }
 
     private void updateListenersProgress(long highWatermark) {
-        for (ListenerContext listenerContext : listenerContexts) {
+        for (ListenerContext listenerContext : listenerContexts.values()) {
             listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset 
-> {
                 if (nextExpectedOffset < log.startOffset() && 
nextExpectedOffset < highWatermark) {
                     SnapshotReader<T> snapshot = 
latestSnapshot().orElseThrow(() -> new IllegalStateException(
@@ -335,7 +335,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
     }
 
     private void maybeFireHandleCommit(long baseOffset, int epoch, long 
appendTimestamp, int sizeInBytes, List<T> records) {
-        for (ListenerContext listenerContext : listenerContexts) {
+        for (ListenerContext listenerContext : listenerContexts.values()) {
             listenerContext.nextExpectedOffset().ifPresent(nextOffset -> {
                 if (nextOffset == baseOffset) {
                     listenerContext.fireHandleCommit(baseOffset, epoch, 
appendTimestamp, sizeInBytes, records);
@@ -345,13 +345,13 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
     }
 
     private void maybeFireLeaderChange(LeaderState<T> state) {
-        for (ListenerContext listenerContext : listenerContexts) {
+        for (ListenerContext listenerContext : listenerContexts.values()) {
             listenerContext.maybeFireLeaderChange(quorum.leaderAndEpoch(), 
state.epochStartOffset());
         }
     }
 
     private void maybeFireLeaderChange() {
-        for (ListenerContext listenerContext : listenerContexts) {
+        for (ListenerContext listenerContext : listenerContexts.values()) {
             listenerContext.maybeFireLeaderChange(quorum.leaderAndEpoch());
         }
     }
@@ -380,11 +380,18 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
 
     @Override
     public void register(Listener<T> listener) {
-        pendingListeners.add(listener);
+        pendingRegistrations.add(Registration.register(listener));
         wakeup();
     }
 
     @Override
+    public void unregister(Listener<T> listener) {
+        pendingRegistrations.add(Registration.unregister(listener));
+        // No need to wakeup the polling thread. It is a removal so the 
updates can be
+        // delayed until the polling thread wakes up for other reasons.
+    }
+
+    @Override
     public LeaderAndEpoch leaderAndEpoch() {
         return quorum.leaderAndEpoch();
     }
@@ -1021,6 +1028,10 @@ public class KafkaRaftClient<T> implements RaftClient<T> 
{
         return OptionalInt.of(leaderIdOrNil);
     }
 
+    private static String listenerName(Listener<?> listener) {
+        return String.format("%s@%s", listener.getClass().getTypeName(), 
System.identityHashCode(listener));
+    }
+
     private boolean handleFetchResponse(
         RaftResponse.Inbound responseMetadata,
         long currentTimeMs
@@ -2102,10 +2113,14 @@ public class KafkaRaftClient<T> implements 
RaftClient<T> {
     }
 
     private void pollListeners() {
-        // Register any listeners added since the last poll
-        while (!pendingListeners.isEmpty()) {
-            Listener<T> listener = pendingListeners.poll();
-            listenerContexts.add(new ListenerContext(listener));
+        // Apply all of the pending registration
+        while (true) {
+            Registration<T> registration = pendingRegistrations.poll();
+            if (registration == null) {
+                break;
+            }
+
+            processRegistration(registration);
         }
 
         // Check listener progress to see if reads are expected
@@ -2114,6 +2129,25 @@ public class KafkaRaftClient<T> implements RaftClient<T> 
{
         });
     }
 
+    private void processRegistration(Registration<T> registration) {
+        Listener<T> listener = registration.listener();
+        Registration.Ops ops = registration.ops();
+
+        if (ops == Registration.Ops.REGISTER) {
+            if (listenerContexts.putIfAbsent(listener, new 
ListenerContext(listener)) != null) {
+                logger.error("Attempting to add a listener that already 
exists: {}", listenerName(listener));
+            } else {
+                logger.info("Registered the listener {}", 
listenerName(listener));
+            }
+        } else {
+            if (listenerContexts.remove(listener) == null) {
+                logger.error("Attempting to remove a listener that doesn't 
exists: {}", listenerName(listener));
+            } else {
+                logger.info("Unregistered the listener {}", 
listenerName(listener));
+            }
+        }
+    }
+
     private boolean maybeCompleteShutdown(long currentTimeMs) {
         GracefulShutdown shutdown = this.shutdown.get();
         if (shutdown == null) {
@@ -2377,13 +2411,43 @@ public class KafkaRaftClient<T> implements 
RaftClient<T> {
         }
     }
 
+    private static final class Registration<T> {
+        private final Ops ops;
+        private final Listener<T> listener;
+
+        private Registration(Ops ops, Listener<T> listener) {
+            this.ops = ops;
+            this.listener = listener;
+        }
+
+        private Ops ops() {
+            return ops;
+        }
+
+        private Listener<T> listener() {
+            return listener;
+        }
+
+        private static enum Ops {
+            REGISTER, UNREGISTER
+        }
+
+        private static <T> Registration<T> register(Listener<T> listener) {
+            return new Registration<>(Ops.REGISTER, listener);
+        }
+
+        private static <T> Registration<T> unregister(Listener<T> listener) {
+            return new Registration<>(Ops.UNREGISTER, listener);
+        }
+    }
+
     private final class ListenerContext implements 
CloseListener<BatchReader<T>> {
         private final RaftClient.Listener<T> listener;
         // This field is used only by the Raft IO thread
         private LeaderAndEpoch lastFiredLeaderChange = new 
LeaderAndEpoch(OptionalInt.empty(), 0);
 
         // These fields are visible to both the Raft IO thread and the listener
-        // and are protected through synchronization on this `ListenerContext` 
instance
+        // and are protected through synchronization on this ListenerContext 
instance
         private BatchReader<T> lastSent = null;
         private long nextOffset = 0;
 
@@ -2395,7 +2459,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
          * Get the last acked offset, which is one greater than the offset of 
the
          * last record which was acked by the state machine.
          */
-        public synchronized long nextOffset() {
+        private synchronized long nextOffset() {
             return nextOffset;
         }
 
@@ -2407,7 +2471,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
          * we delay sending additional data until the state machine has read 
to the
          * end and the last offset is determined.
          */
-        public synchronized OptionalLong nextExpectedOffset() {
+        private synchronized OptionalLong nextExpectedOffset() {
             if (lastSent != null) {
                 OptionalLong lastSentOffset = lastSent.lastOffset();
                 if (lastSentOffset.isPresent()) {
@@ -2424,7 +2488,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
          * This API is used when the Listener needs to be notified of a new 
snapshot. This happens
          * when the context's next offset is less than the log start offset.
          */
-        public void fireHandleSnapshot(SnapshotReader<T> reader) {
+        private void fireHandleSnapshot(SnapshotReader<T> reader) {
             synchronized (this) {
                 nextOffset = reader.snapshotId().offset;
                 lastSent = null;
@@ -2440,7 +2504,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
          * know whether it has been committed. Rather than retaining the 
uncommitted
          * data in memory, we let the state machine read the records from disk.
          */
-        public void fireHandleCommit(long baseOffset, Records records) {
+        private void fireHandleCommit(long baseOffset, Records records) {
             fireHandleCommit(
                 RecordsBatchReader.of(
                     baseOffset,
@@ -2460,7 +2524,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
          * a nice optimization for the leader which is typically doing more 
work than all of the
          * followers.
          */
-        public void fireHandleCommit(
+        private void fireHandleCommit(
             long baseOffset,
             int epoch,
             long appendTimestamp,
@@ -2472,8 +2536,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
             fireHandleCommit(reader);
         }
 
-        public String listenerName() {
-            return listener.getClass().getTypeName();
+        private String listenerName() {
+            return KafkaRaftClient.listenerName(listener);
         }
 
         private void fireHandleCommit(BatchReader<T> reader) {
@@ -2489,7 +2553,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
             listener.handleCommit(reader);
         }
 
-        void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
+        private void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
             if (shouldFireLeaderChange(leaderAndEpoch)) {
                 lastFiredLeaderChange = leaderAndEpoch;
                 logger.debug("Notifying listener {} of leader change {}", 
listenerName(), leaderAndEpoch);
@@ -2508,7 +2572,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
             }
         }
 
-        void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch, long 
epochStartOffset) {
+        private void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch, long 
epochStartOffset) {
             // If this node is becoming the leader, then we can fire 
`handleClaim` as soon
             // as the listener has caught up to the start of the leader epoch. 
This guarantees
             // that the state machine has seen the full committed state before 
it becomes
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index e0d5eaa..46cd292 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -89,15 +89,32 @@ public interface RaftClient<T> extends AutoCloseable {
     void initialize();
 
     /**
-     * Register a listener to get commit/leader notifications.
+     * Register a listener to get commit, snapshot and leader notifications.
      *
-     * @param listener the listener
+     * The implementation of this interface assumes that each call to {@code 
register} uses
+     * a different {@code Listener} instance. If the same instance is used for 
multiple calls
+     * to this method, then only one {@code Listener} will be registered.
+     *
+     * @param listener the listener to register
      */
     void register(Listener<T> listener);
 
     /**
+     * Unregisters a listener.
+     *
+     * To distinguish from events that happend before the call to {@code 
unregister} and a future
+     * call to {@code register}, different {@code Listener} instances must be 
used.
+     *
+     * If the {@code Listener} provided was never registered then the 
unregistration is ignored. 
+     *
+     * @param listener the listener to unregister
+     */
+    void unregister(Listener<T> listener);
+
+    /**
      * Return the current {@link LeaderAndEpoch}.
-     * @return the current {@link LeaderAndEpoch}
+     *
+     * @return the current leader and epoch
      */
     LeaderAndEpoch leaderAndEpoch();
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java 
b/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
index 893b390..320e3d9 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
@@ -22,7 +22,7 @@ public final class ValidOffsetAndEpoch {
     final private Kind kind;
     final private OffsetAndEpoch offsetAndEpoch;
 
-    ValidOffsetAndEpoch(Kind kind, OffsetAndEpoch offsetAndEpoch) {
+    private ValidOffsetAndEpoch(Kind kind, OffsetAndEpoch offsetAndEpoch) {
         this.kind = kind;
         this.offsetAndEpoch = offsetAndEpoch;
     }
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index a70ad93..d85714b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -2492,6 +2492,47 @@ public class KafkaRaftClientTest {
     }
 
     @Test
+    public void testReregistrationChangesListenerContext() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        List<String> batch1 = Arrays.asList("1", "2", "3");
+        List<String> batch2 = Arrays.asList("4", "5", "6");
+        List<String> batch3 = Arrays.asList("7", "8", "9");
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .appendToLog(1, batch1)
+            .appendToLog(1, batch2)
+            .appendToLog(2, batch3)
+            .withUnknownLeader(epoch - 1)
+            .build();
+
+        context.becomeLeader();
+        context.client.poll();
+        assertEquals(10L, context.log.endOffset().offset);
+
+        // Let the initial listener catch up
+        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+        context.pollUntil(() -> 
OptionalLong.of(8).equals(context.listener.lastCommitOffset()));
+
+        // Register a second listener
+        RaftClientTestContext.MockListener secondListener = new 
RaftClientTestContext.MockListener(OptionalInt.of(localId));
+        context.client.register(secondListener);
+        context.pollUntil(() -> 
OptionalLong.of(8).equals(secondListener.lastCommitOffset()));
+        context.client.unregister(secondListener);
+
+        // Write to the log and show that the default listener gets updated...
+        assertEquals(10L, context.client.scheduleAppend(epoch, 
singletonList("a")));
+        context.client.poll();
+        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+        context.pollUntil(() -> 
OptionalLong.of(10).equals(context.listener.lastCommitOffset()));
+        // ... but unregister listener doesn't
+        assertEquals(OptionalLong.of(8), secondListener.lastCommitOffset());
+    }
+
+    @Test
     public void 
testHandleCommitCallbackFiresAfterFollowerHighWatermarkAdvances() throws 
Exception {
         int localId = 0;
         int otherNodeId = 1;
@@ -2677,5 +2718,4 @@ public class KafkaRaftClientTest {
     private static KafkaMetric getMetric(final Metrics metrics, final String 
name) {
         return metrics.metrics().get(metrics.metricName(name, "raft-metrics"));
     }
-
 }
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java 
b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index 1b2caca..9365640 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -766,8 +766,8 @@ public class MockLogTest {
         appendBatch(numberOfRecords, epoch);
 
         ValidOffsetAndEpoch resultOffsetAndEpoch = 
log.validateOffsetAndEpoch(numberOfRecords, epoch + 1);
-        assertEquals(new 
ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new 
OffsetAndEpoch(log.endOffset().offset, epoch)),
-                resultOffsetAndEpoch);
+        assertEquals(ValidOffsetAndEpoch.diverging(new 
OffsetAndEpoch(log.endOffset().offset, epoch)),
+            resultOffsetAndEpoch);
     }
 
     @Test
@@ -782,8 +782,7 @@ public class MockLogTest {
         log.truncateToLatestSnapshot();
 
         ValidOffsetAndEpoch resultOffsetAndEpoch = 
log.validateOffsetAndEpoch(offset, epoch - 1);
-        assertEquals(new 
ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.SNAPSHOT, olderEpochSnapshotId),
-                resultOffsetAndEpoch);
+        assertEquals(ValidOffsetAndEpoch.snapshot(olderEpochSnapshotId), 
resultOffsetAndEpoch);
     }
 
     @Test
@@ -798,8 +797,7 @@ public class MockLogTest {
         log.truncateToLatestSnapshot();
 
         ValidOffsetAndEpoch resultOffsetAndEpoch = 
log.validateOffsetAndEpoch(offset - 1, epoch);
-        assertEquals(new 
ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.SNAPSHOT, olderEpochSnapshotId),
-                resultOffsetAndEpoch);
+        assertEquals(ValidOffsetAndEpoch.snapshot(olderEpochSnapshotId), 
resultOffsetAndEpoch);
     }
 
     @Test
@@ -814,8 +812,7 @@ public class MockLogTest {
         log.truncateToLatestSnapshot();
 
         ValidOffsetAndEpoch resultOffsetAndEpoch = 
log.validateOffsetAndEpoch(offset, epoch);
-        assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.VALID, 
olderEpochSnapshotId),
-                resultOffsetAndEpoch);
+        assertEquals(ValidOffsetAndEpoch.Kind.VALID, 
resultOffsetAndEpoch.kind());
     }
 
     @Test
@@ -835,8 +832,7 @@ public class MockLogTest {
 
         // offset is not equal to oldest snapshot's offset
         ValidOffsetAndEpoch resultOffsetAndEpoch = 
log.validateOffsetAndEpoch(100, 3);
-        assertEquals(new 
ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new OffsetAndEpoch(20, 
2)),
-                resultOffsetAndEpoch);
+        assertEquals(ValidOffsetAndEpoch.diverging(new OffsetAndEpoch(20, 2)), 
resultOffsetAndEpoch);
     }
 
     @Test
@@ -854,8 +850,7 @@ public class MockLogTest {
 
         // offset is not equal to oldest snapshot's offset
         ValidOffsetAndEpoch resultOffsetAndEpoch = 
log.validateOffsetAndEpoch(100, 2);
-        assertEquals(new 
ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, olderEpochSnapshotId),
-                resultOffsetAndEpoch);
+        assertEquals(ValidOffsetAndEpoch.diverging(olderEpochSnapshotId), 
resultOffsetAndEpoch);
     }
 
     @Test
@@ -866,8 +861,8 @@ public class MockLogTest {
         appendBatch(numberOfRecords, epoch);
 
         ValidOffsetAndEpoch resultOffsetAndEpoch = 
log.validateOffsetAndEpoch(numberOfRecords + 1, epoch);
-        assertEquals(new 
ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new 
OffsetAndEpoch(log.endOffset().offset, epoch)),
-                resultOffsetAndEpoch);
+        assertEquals(ValidOffsetAndEpoch.diverging(new 
OffsetAndEpoch(log.endOffset().offset, epoch)),
+            resultOffsetAndEpoch);
     }
 
     @Test
@@ -879,8 +874,7 @@ public class MockLogTest {
         appendBatch(numberOfRecords, epoch + 1);
 
         ValidOffsetAndEpoch resultOffsetAndEpoch = 
log.validateOffsetAndEpoch(11, epoch);
-        assertEquals(new 
ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new OffsetAndEpoch(10, 
epoch)),
-                resultOffsetAndEpoch);
+        assertEquals(ValidOffsetAndEpoch.diverging(new OffsetAndEpoch(10, 
epoch)), resultOffsetAndEpoch);
     }
 
     @Test
@@ -891,8 +885,7 @@ public class MockLogTest {
         appendBatch(numberOfRecords, epoch);
 
         ValidOffsetAndEpoch resultOffsetAndEpoch = 
log.validateOffsetAndEpoch(numberOfRecords - 1, epoch);
-        assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.VALID, 
new OffsetAndEpoch(numberOfRecords - 1, epoch)),
-                resultOffsetAndEpoch);
+        assertEquals(ValidOffsetAndEpoch.Kind.VALID, 
resultOffsetAndEpoch.kind());
     }
 
     private Optional<OffsetRange> readOffsets(long startOffset, Isolation 
isolation) {

Reply via email to