This is an automated email from the ASF dual-hosted git repository. davidarthur pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit d69212a42246eeaa123e0a574d08ba44e37e4a1d Author: José Armando García Sancio <[email protected]> AuthorDate: Thu Mar 4 10:55:43 2021 -0800 KAFKA-12376: Apply atomic append to the log (#10253) --- core/src/main/scala/kafka/raft/RaftManager.scala | 29 ++++++-- .../controller/ClientQuotaControlManager.java | 2 +- .../kafka/controller/ClusterControlManager.java | 2 +- .../controller/ConfigurationControlManager.java | 4 +- .../apache/kafka/controller/ControllerResult.java | 38 ++++++---- .../controller/ControllerResultAndOffset.java | 32 ++++----- .../kafka/controller/FeatureControlManager.java | 3 +- .../apache/kafka/controller/QuorumController.java | 23 +++--- .../controller/ReplicationControlManager.java | 12 ++-- .../org/apache/kafka/metalog/LocalLogManager.java | 17 ++++- .../org/apache/kafka/metalog/MetaLogManager.java | 23 +++++- .../ConfigurationControlManagerTest.java | 83 ++++++++++++++++------ .../controller/FeatureControlManagerTest.java | 58 ++++++++++----- .../org/apache/kafka/metalog/LocalLogManager.java | 17 ++++- .../kafka/raft/metadata/MetaLogRaftShim.java | 28 +++++++- 15 files changed, 268 insertions(+), 103 deletions(-) diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index 1881a1d..3b714f3 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -92,6 +92,11 @@ trait RaftManager[T] { listener: RaftClient.Listener[T] ): Unit + def scheduleAtomicAppend( + epoch: Int, + records: Seq[T] + ): Option[Long] + def scheduleAppend( epoch: Int, records: Seq[T] @@ -157,16 +162,32 @@ class KafkaRaftManager[T]( raftClient.register(listener) } + override def scheduleAtomicAppend( + epoch: Int, + records: Seq[T] + ): Option[Long] = { + append(epoch, records, true) + } + override def scheduleAppend( epoch: Int, records: Seq[T] ): Option[Long] = { - val offset: java.lang.Long = raftClient.scheduleAppend(epoch, records.asJava) - if (offset == null) { - None + append(epoch, records, false) + } + + private def append( + epoch: Int, + records: Seq[T], + isAtomic: Boolean + ): Option[Long] = { + val offset = if (isAtomic) { + raftClient.scheduleAtomicAppend(epoch, records.asJava) } else { - Some(Long.unbox(offset)) + raftClient.scheduleAppend(epoch, records.asJava) } + + Option(offset).map(Long.unbox) } override def handleRequest( diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java index 4aac9e4..9b8e2d6 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java @@ -86,7 +86,7 @@ public class ClientQuotaControlManager { } }); - return new ControllerResult<>(outputRecords, outputResults); + return ControllerResult.atomicOf(outputRecords, outputResults); } /** diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index 6e329c7..4748d19 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -213,7 +213,7 @@ public class ClusterControlManager { List<ApiMessageAndVersion> records = new ArrayList<>(); records.add(new ApiMessageAndVersion(record, (short) 0)); - return new ControllerResult<>(records, new BrokerRegistrationReply(brokerEpoch)); + return ControllerResult.of(records, new BrokerRegistrationReply(brokerEpoch)); } public void replay(RegisterBrokerRecord record) { diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index 16e58fa..dcfe92d46 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -83,7 +83,7 @@ public class ConfigurationControlManager { outputRecords, outputResults); } - return new ControllerResult<>(outputRecords, outputResults); + return ControllerResult.atomicOf(outputRecords, outputResults); } private void incrementalAlterConfigResource(ConfigResource configResource, @@ -171,7 +171,7 @@ public class ConfigurationControlManager { outputRecords, outputResults); } - return new ControllerResult<>(outputRecords, outputResults); + return ControllerResult.atomicOf(outputRecords, outputResults); } private void legacyAlterConfigResource(ConfigResource configResource, diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java index 4906c8b..e6ae031 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java @@ -19,7 +19,7 @@ package org.apache.kafka.controller; import org.apache.kafka.metadata.ApiMessageAndVersion; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @@ -28,15 +28,13 @@ import java.util.stream.Collectors; class ControllerResult<T> { private final List<ApiMessageAndVersion> records; private final T response; + private final boolean isAtomic; - public ControllerResult(T response) { - this(new ArrayList<>(), response); - } - - public ControllerResult(List<ApiMessageAndVersion> records, T response) { + protected ControllerResult(List<ApiMessageAndVersion> records, T response, boolean isAtomic) { Objects.requireNonNull(records); this.records = records; this.response = response; + this.isAtomic = isAtomic; } public List<ApiMessageAndVersion> records() { @@ -47,6 +45,10 @@ class ControllerResult<T> { return response; } + public boolean isAtomic() { + return isAtomic; + } + @Override public boolean equals(Object o) { if (o == null || (!o.getClass().equals(getClass()))) { @@ -54,22 +56,34 @@ class ControllerResult<T> { } ControllerResult other = (ControllerResult) o; return records.equals(other.records) && - Objects.equals(response, other.response); + Objects.equals(response, other.response) && + Objects.equals(isAtomic, other.isAtomic); } @Override public int hashCode() { - return Objects.hash(records, response); + return Objects.hash(records, response, isAtomic); } @Override public String toString() { - return "ControllerResult(records=" + String.join(",", - records.stream().map(r -> r.toString()).collect(Collectors.toList())) + - ", response=" + response + ")"; + return String.format( + "ControllerResult(records=%s, response=%s, isAtomic=%s)", + String.join(",", records.stream().map(ApiMessageAndVersion::toString).collect(Collectors.toList())), + response, + isAtomic + ); } public ControllerResult<T> withoutRecords() { - return new ControllerResult<>(new ArrayList<>(), response); + return new ControllerResult<>(Collections.emptyList(), response, false); + } + + public static <T> ControllerResult<T> atomicOf(List<ApiMessageAndVersion> records, T response) { + return new ControllerResult<>(records, response, true); + } + + public static <T> ControllerResult<T> of(List<ApiMessageAndVersion> records, T response) { + return new ControllerResult<>(records, response, false); } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java index 5e483f7..8b8ca8d 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java @@ -19,24 +19,15 @@ package org.apache.kafka.controller; import org.apache.kafka.metadata.ApiMessageAndVersion; -import java.util.ArrayList; -import java.util.List; import java.util.Objects; import java.util.stream.Collectors; -class ControllerResultAndOffset<T> extends ControllerResult<T> { +final class ControllerResultAndOffset<T> extends ControllerResult<T> { private final long offset; - public ControllerResultAndOffset(T response) { - super(new ArrayList<>(), response); - this.offset = -1; - } - - public ControllerResultAndOffset(long offset, - List<ApiMessageAndVersion> records, - T response) { - super(records, response); + private ControllerResultAndOffset(long offset, ControllerResult<T> result) { + super(result.records(), result.response(), result.isAtomic()); this.offset = offset; } @@ -52,18 +43,27 @@ class ControllerResultAndOffset<T> extends ControllerResult<T> { ControllerResultAndOffset other = (ControllerResultAndOffset) o; return records().equals(other.records()) && response().equals(other.response()) && + isAtomic() == other.isAtomic() && offset == other.offset; } @Override public int hashCode() { - return Objects.hash(records(), response(), offset); + return Objects.hash(records(), response(), isAtomic(), offset); } @Override public String toString() { - return "ControllerResultAndOffset(records=" + String.join(",", - records().stream().map(r -> r.toString()).collect(Collectors.toList())) + - ", response=" + response() + ", offset=" + offset + ")"; + return String.format( + "ControllerResultAndOffset(records=%s, response=%s, isAtomic=%s, offset=%s)", + String.join(",", records().stream().map(ApiMessageAndVersion::toString).collect(Collectors.toList())), + response(), + isAtomic(), + offset + ); + } + + public static <T> ControllerResultAndOffset<T> of(long offset, ControllerResult<T> result) { + return new ControllerResultAndOffset<>(offset, result); } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java index 25ff3fd..99874ac 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java @@ -69,7 +69,8 @@ public class FeatureControlManager { results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(), downgradeables.contains(entry.getKey()), brokerFeatures, records)); } - return new ControllerResult<>(records, results); + + return ControllerResult.atomicOf(records, results); } private ApiError updateFeature(String featureName, diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 3e5a9a8..6ee1b7e 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -17,7 +17,6 @@ package org.apache.kafka.controller; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -268,7 +267,7 @@ public final class QuorumController implements Controller { class ControlEvent implements EventQueue.Event { private final String name; private final Runnable handler; - private long eventCreatedTimeNs = time.nanoseconds(); + private final long eventCreatedTimeNs = time.nanoseconds(); private Optional<Long> startProcessingTimeNs = Optional.empty(); ControlEvent(String name, Runnable handler) { @@ -310,7 +309,7 @@ public final class QuorumController implements Controller { private final String name; private final CompletableFuture<T> future; private final Supplier<T> handler; - private long eventCreatedTimeNs = time.nanoseconds(); + private final long eventCreatedTimeNs = time.nanoseconds(); private Optional<Long> startProcessingTimeNs = Optional.empty(); ControllerReadEvent(String name, Supplier<T> handler) { @@ -392,7 +391,7 @@ public final class QuorumController implements Controller { private final String name; private final CompletableFuture<T> future; private final ControllerWriteOperation<T> op; - private long eventCreatedTimeNs = time.nanoseconds(); + private final long eventCreatedTimeNs = time.nanoseconds(); private Optional<Long> startProcessingTimeNs = Optional.empty(); private ControllerResultAndOffset<T> resultAndOffset; @@ -426,8 +425,7 @@ public final class QuorumController implements Controller { if (!maybeOffset.isPresent()) { // If the purgatory is empty, there are no pending operations and no // uncommitted state. We can return immediately. - resultAndOffset = new ControllerResultAndOffset<>(-1, - new ArrayList<>(), result.response()); + resultAndOffset = ControllerResultAndOffset.of(-1, result); log.debug("Completing read-only operation {} immediately because " + "the purgatory is empty.", this); complete(null); @@ -435,8 +433,7 @@ public final class QuorumController implements Controller { } // If there are operations in the purgatory, we want to wait for the latest // one to complete before returning our result to the user. - resultAndOffset = new ControllerResultAndOffset<>(maybeOffset.get(), - result.records(), result.response()); + resultAndOffset = ControllerResultAndOffset.of(maybeOffset.get(), result); log.debug("Read-only operation {} will be completed when the log " + "reaches offset {}", this, resultAndOffset.offset()); } else { @@ -444,11 +441,15 @@ public final class QuorumController implements Controller { // written before we can return our result to the user. Here, we hand off // the batch of records to the metadata log manager. They will be written // out asynchronously. - long offset = logManager.scheduleWrite(controllerEpoch, result.records()); + final long offset; + if (result.isAtomic()) { + offset = logManager.scheduleAtomicWrite(controllerEpoch, result.records()); + } else { + offset = logManager.scheduleWrite(controllerEpoch, result.records()); + } op.processBatchEndOffset(offset); writeOffset = offset; - resultAndOffset = new ControllerResultAndOffset<>(offset, - result.records(), result.response()); + resultAndOffset = ControllerResultAndOffset.of(offset, result); for (ApiMessageAndVersion message : result.records()) { replay(message.message(), offset); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 14ff321..8bc0670 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -440,7 +440,7 @@ public class ReplicationControlManager { resultsPrefix = ", "; } log.info("createTopics result(s): {}", resultsBuilder.toString()); - return new ControllerResult<>(records, data); + return ControllerResult.atomicOf(records, data); } private ApiError createTopic(CreatableTopic topic, @@ -721,7 +721,7 @@ public class ReplicationControlManager { setIsr(partitionData.newIsr())); } } - return new ControllerResult<>(records, response); + return ControllerResult.of(records, response); } /** @@ -875,7 +875,7 @@ public class ReplicationControlManager { setErrorMessage(error.message())); } } - return new ControllerResult<>(records, response); + return ControllerResult.of(records, response); } static boolean electionIsUnclean(byte electionType) { @@ -970,7 +970,7 @@ public class ReplicationControlManager { states.next().fenced(), states.next().inControlledShutdown(), states.next().shouldShutDown()); - return new ControllerResult<>(records, reply); + return ControllerResult.of(records, reply); } int bestLeader(int[] replicas, int[] isr, boolean unclean) { @@ -999,7 +999,7 @@ public class ReplicationControlManager { } List<ApiMessageAndVersion> records = new ArrayList<>(); handleBrokerUnregistered(brokerId, registration.epoch(), records); - return new ControllerResult<>(records, null); + return ControllerResult.of(records, null); } ControllerResult<Void> maybeFenceStaleBrokers() { @@ -1011,6 +1011,6 @@ public class ReplicationControlManager { handleBrokerFenced(brokerId, records); heartbeatManager.fence(brokerId); } - return new ControllerResult<>(records, null); + return ControllerResult.of(records, null); } } diff --git a/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java index ef85314..99ae3a7 100644 --- a/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java +++ b/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java @@ -328,8 +328,21 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable { @Override public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) { - return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch( - batch.stream().map(r -> r.message()).collect(Collectors.toList()))); + return scheduleAtomicWrite(epoch, batch); + } + + @Override + public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) { + return shared.tryAppend( + nodeId, + leader.epoch(), + new LocalRecordBatch( + batch + .stream() + .map(ApiMessageAndVersion::message) + .collect(Collectors.toList()) + ) + ); } @Override diff --git a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java index 67a6ca5..9126245 100644 --- a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java +++ b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java @@ -50,14 +50,31 @@ public interface MetaLogManager { * offset before renouncing its leadership. The listener should determine this by * monitoring the committed offsets. * - * @param epoch The controller epoch. - * @param batch The batch of messages to write. + * @param epoch the controller epoch + * @param batch the batch of messages to write * - * @return The offset of the message. + * @return the offset of the last message in the batch + * @throws IllegalArgumentException if buffer allocatio failed and the client should backoff */ long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch); /** + * Schedule a atomic write to the log. + * + * The write will be scheduled to happen at some time in the future. All of the messages in batch + * will be appended atomically in one batch. The listener may regard the write as successful + * if and only if the MetaLogManager reaches the given offset before renouncing its leadership. + * The listener should determine this by monitoring the committed offsets. + * + * @param epoch the controller epoch + * @param batch the batch of messages to write + * + * @return the offset of the last message in the batch + * @throws IllegalArgumentException if buffer allocatio failed and the client should backoff + */ + long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch); + + /** * Renounce the leadership. * * @param epoch The epoch. If this does not match the current epoch, this diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java index 49a5533..561a25b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java @@ -135,18 +135,42 @@ public class ConfigurationControlManagerTest { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); ConfigurationControlManager manager = new ConfigurationControlManager(new LogContext(), snapshotRegistry, CONFIGS); - assertEquals(new ControllerResult<Map<ConfigResource, ApiError>>(Collections.singletonList( - new ApiMessageAndVersion(new ConfigRecord(). - setResourceType(TOPIC.id()).setResourceName("mytopic"). - setName("abc").setValue("123"), (short) 0)), - toMap(entry(BROKER0, new ApiError( - Errors.INVALID_REQUEST, "A DELETE op was given with a non-null value.")), - entry(MYTOPIC, ApiError.NONE))), - manager.incrementalAlterConfigs(toMap(entry(BROKER0, toMap( - entry("foo.bar", entry(DELETE, "abc")), - entry("quux", entry(SET, "abc")))), - entry(MYTOPIC, toMap( - entry("abc", entry(APPEND, "123"))))))); + assertEquals( + ControllerResult.atomicOf( + Collections.singletonList( + new ApiMessageAndVersion( + new ConfigRecord() + .setResourceType(TOPIC.id()) + .setResourceName("mytopic") + .setName("abc") + .setValue("123"), + (short) 0 + ) + ), + toMap( + entry( + BROKER0, + new ApiError( + Errors.INVALID_REQUEST, + "A DELETE op was given with a non-null value." + ) + ), + entry(MYTOPIC, ApiError.NONE) + ) + ), + manager.incrementalAlterConfigs( + toMap( + entry( + BROKER0, + toMap( + entry("foo.bar", entry(DELETE, "abc")), + entry("quux", entry(SET, "abc")) + ) + ), + entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123")))) + ) + ) + ); } @Test @@ -184,20 +208,33 @@ public class ConfigurationControlManagerTest { new ApiMessageAndVersion(new ConfigRecord(). setResourceType(TOPIC.id()).setResourceName("mytopic"). setName("def").setValue("901"), (short) 0)); - assertEquals(new ControllerResult<Map<ConfigResource, ApiError>>( + assertEquals( + ControllerResult.atomicOf( expectedRecords1, - toMap(entry(MYTOPIC, ApiError.NONE))), - manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap( - entry("abc", "456"), entry("def", "901")))))); + toMap(entry(MYTOPIC, ApiError.NONE)) + ), + manager.legacyAlterConfigs( + toMap(entry(MYTOPIC, toMap(entry("abc", "456"), entry("def", "901")))) + ) + ); for (ApiMessageAndVersion message : expectedRecords1) { manager.replay((ConfigRecord) message.message()); } - assertEquals(new ControllerResult<Map<ConfigResource, ApiError>>(Arrays.asList( - new ApiMessageAndVersion(new ConfigRecord(). - setResourceType(TOPIC.id()).setResourceName("mytopic"). - setName("abc").setValue(null), (short) 0)), - toMap(entry(MYTOPIC, ApiError.NONE))), - manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap( - entry("def", "901")))))); + assertEquals( + ControllerResult.atomicOf( + Arrays.asList( + new ApiMessageAndVersion( + new ConfigRecord() + .setResourceType(TOPIC.id()) + .setResourceName("mytopic") + .setName("abc") + .setValue(null), + (short) 0 + ) + ), + toMap(entry(MYTOPIC, ApiError.NONE)) + ), + manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("def", "901"))))) + ); } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java index 8687cc8..0670984 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java @@ -18,10 +18,8 @@ package org.apache.kafka.controller; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import org.apache.kafka.common.metadata.FeatureLevelRecord; @@ -61,11 +59,11 @@ public class FeatureControlManagerTest { rangeMap("foo", 1, 2), snapshotRegistry); assertEquals(new FeatureMapAndEpoch(new FeatureMap(Collections.emptyMap()), -1), manager.finalizedFeatures(-1)); - assertEquals(new ControllerResult<>(Collections. + assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections. singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION, "The controller does not support the given feature range."))), manager.updateFeatures(rangeMap("foo", 1, 3), - new HashSet<>(Arrays.asList("foo")), + Collections.singleton("foo"), Collections.emptyMap())); ControllerResult<Map<String, ApiError>> result = manager.updateFeatures( rangeMap("foo", 1, 2, "bar", 1, 1), Collections.emptySet(), @@ -101,12 +99,24 @@ public class FeatureControlManagerTest { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); FeatureControlManager manager = new FeatureControlManager( rangeMap("foo", 1, 5, "bar", 1, 2), snapshotRegistry); - assertEquals(new ControllerResult<>(Collections. - singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION, - "Broker 5 does not support the given feature range."))), - manager.updateFeatures(rangeMap("foo", 1, 3), - new HashSet<>(Arrays.asList("foo")), - Collections.singletonMap(5, rangeMap()))); + + assertEquals( + ControllerResult.atomicOf( + Collections.emptyList(), + Collections.singletonMap( + "foo", + new ApiError( + Errors.INVALID_UPDATE_VERSION, + "Broker 5 does not support the given feature range." + ) + ) + ), + manager.updateFeatures( + rangeMap("foo", 1, 3), + Collections.singleton("foo"), + Collections.singletonMap(5, rangeMap()) + ) + ); ControllerResult<Map<String, ApiError>> result = manager.updateFeatures( rangeMap("foo", 1, 3), Collections.emptySet(), Collections.emptyMap()); @@ -114,19 +124,31 @@ public class FeatureControlManagerTest { manager.replay((FeatureLevelRecord) result.records().get(0).message(), 3); snapshotRegistry.createSnapshot(3); - assertEquals(new ControllerResult<>(Collections. + assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections. singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION, "Can't downgrade the maximum version of this feature without " + "setting downgradable to true."))), manager.updateFeatures(rangeMap("foo", 1, 2), Collections.emptySet(), Collections.emptyMap())); - assertEquals(new ControllerResult<>( - Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord(). - setName("foo").setMinFeatureLevel((short) 1).setMaxFeatureLevel((short) 2), - (short) 0)), - Collections.singletonMap("foo", ApiError.NONE)), - manager.updateFeatures(rangeMap("foo", 1, 2), - new HashSet<>(Collections.singletonList("foo")), Collections.emptyMap())); + assertEquals( + ControllerResult.atomicOf( + Collections.singletonList( + new ApiMessageAndVersion( + new FeatureLevelRecord() + .setName("foo") + .setMinFeatureLevel((short) 1) + .setMaxFeatureLevel((short) 2), + (short) 0 + ) + ), + Collections.singletonMap("foo", ApiError.NONE) + ), + manager.updateFeatures( + rangeMap("foo", 1, 2), + Collections.singleton("foo"), + Collections.emptyMap() + ) + ); } } diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java index 7b6cf06..590f89c 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java @@ -371,8 +371,21 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable { @Override public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) { - return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch( - batch.stream().map(r -> r.message()).collect(Collectors.toList()))); + return scheduleAtomicWrite(epoch, batch); + } + + @Override + public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) { + return shared.tryAppend( + nodeId, + leader.epoch(), + new LocalRecordBatch( + batch + .stream() + .map(ApiMessageAndVersion::message) + .collect(Collectors.toList()) + ) + ); } @Override diff --git a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java index bf88e7d..1ca63f1 100644 --- a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java +++ b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java @@ -53,8 +53,34 @@ public class MetaLogRaftShim implements MetaLogManager { } @Override + public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) { + return write(epoch, batch, true); + } + + @Override public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) { - return client.scheduleAppend((int) epoch, batch); + return write(epoch, batch, false); + } + + private long write(long epoch, List<ApiMessageAndVersion> batch, boolean isAtomic) { + final Long result; + if (isAtomic) { + result = client.scheduleAtomicAppend((int) epoch, batch); + } else { + result = client.scheduleAppend((int) epoch, batch); + } + + if (result == null) { + throw new IllegalArgumentException( + String.format( + "Unable to alloate a buffer for the schedule write operation: epoch %s, batch %s)", + epoch, + batch + ) + ); + } else { + return result; + } } @Override
