This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new 6016b15beab KAFKA-16770; [2/2] Coalesce records into bigger batches
(#16215)
6016b15beab is described below
commit 6016b15beabd774de6a358f5fdb62a336b7de43e
Author: David Jacot <[email protected]>
AuthorDate: Wed Jun 12 08:29:50 2024 +0200
KAFKA-16770; [2/2] Coalesce records into bigger batches (#16215)
This patch is the continuation of
https://github.com/apache/kafka/pull/15964. It introduces the records
coalescing to the CoordinatorRuntime. It also introduces a new configuration
`group.coordinator.append.linger.ms` which allows administrators to chose the
linger time or disable it with zero. The new configuration defaults to 10ms.
Reviewers: Jeff Kim <[email protected]>, Justine Olshan
<[email protected]>
---
.../kafka/common/record/MemoryRecordsBuilder.java | 12 +
.../src/main/scala/kafka/server/BrokerServer.scala | 1 +
core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +
.../coordinator/group/GroupCoordinatorConfig.java | 12 +
.../coordinator/group/GroupCoordinatorService.java | 2 +-
.../group/runtime/CoordinatorRuntime.java | 672 ++++++++++++++-----
.../group/GroupCoordinatorConfigTest.java | 3 +
.../group/GroupCoordinatorServiceTest.java | 1 +
.../group/runtime/CoordinatorRuntimeTest.java | 736 ++++++++++++++++++++-
.../apache/kafka/server/util/timer/TimerTask.java | 4 +
10 files changed, 1263 insertions(+), 182 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index a5985103ec0..bbcd99070cb 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -870,6 +870,18 @@ public class MemoryRecordsBuilder implements AutoCloseable
{
return this.writeLimit >= estimatedBytesWritten() + recordSize;
}
+ /**
+ * Check if we have room for a given number of bytes.
+ */
+ public boolean hasRoomFor(int estimatedRecordsSize) {
+ if (isFull()) return false;
+ return this.writeLimit >= estimatedBytesWritten() +
estimatedRecordsSize;
+ }
+
+ public int maxAllowedBytes() {
+ return this.writeLimit - this.batchHeaderSizeInBytes;
+ }
+
public boolean isClosed() {
return builtRecords != null;
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index e143dd26668..cb4799afdfb 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -572,6 +572,7 @@ class BrokerServer(
val serde = new CoordinatorRecordSerde
val groupCoordinatorConfig = new GroupCoordinatorConfig(
config.groupCoordinatorNumThreads,
+ config.groupCoordinatorAppendLingerMs,
config.consumerGroupSessionTimeoutMs,
config.consumerGroupHeartbeatIntervalMs,
config.consumerGroupMaxSize,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 745c8648e38..6c9ef51fb0a 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -278,6 +278,7 @@ object KafkaConfig {
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
LIST, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
ValidList.in(Utils.enumOptions(classOf[GroupType]):_*), MEDIUM,
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG,
INT, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1),
MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DOC)
+
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT,
GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0),
MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DOC)
// Internal configuration used by integration and system tests.
.defineInternal(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG,
BOOLEAN, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null,
MEDIUM, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DOC)
@@ -948,6 +949,7 @@ class KafkaConfig private(doLog: Boolean, val props:
java.util.Map[_, _], dynami
val isNewGroupCoordinatorEnabled =
getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG) ||
groupCoordinatorRebalanceProtocols.contains(GroupType.CONSUMER)
val groupCoordinatorNumThreads =
getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG)
+ val groupCoordinatorAppendLingerMs =
getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG)
/** Consumer group configs */
val consumerGroupSessionTimeoutMs =
getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 84219aa46ff..cc86b6cf818 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -57,6 +57,10 @@ public class GroupCoordinatorConfig {
Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(","))
+ ". " +
"The " + Group.GroupType.CONSUMER + " rebalance protocol is in
early access and therefore must not be used in production.";
public static final List<String>
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT =
Collections.singletonList(Group.GroupType.CLASSIC.toString());
+ public final static String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG =
"group.coordinator.append.linger.ms";
+ public final static String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The
duration in milliseconds that the coordinator will " +
+ "wait for writes to accumulate before flushing them to disk.
Transactional writes are not accumulated.";
+ public final static int GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT = 10;
public final static String GROUP_COORDINATOR_NUM_THREADS_CONFIG =
"group.coordinator.threads";
public final static String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number
of threads used by the group coordinator.";
@@ -164,6 +168,12 @@ public class GroupCoordinatorConfig {
*/
public final int numThreads;
+ /**
+ * The duration in milliseconds that the coordinator will wait for writes
to
+ * accumulate before flushing them to disk.
+ */
+ public final int appendLingerMs;
+
/**
* The consumer group session timeout in milliseconds.
*/
@@ -259,6 +269,7 @@ public class GroupCoordinatorConfig {
public GroupCoordinatorConfig(
int numThreads,
+ int appendLingerMs,
int consumerGroupSessionTimeoutMs,
int consumerGroupHeartbeatIntervalMs,
int consumerGroupMaxSize,
@@ -277,6 +288,7 @@ public class GroupCoordinatorConfig {
CompressionType compressionType
) {
this.numThreads = numThreads;
+ this.appendLingerMs = appendLingerMs;
this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs;
this.consumerGroupHeartbeatIntervalMs =
consumerGroupHeartbeatIntervalMs;
this.consumerGroupMaxSize = consumerGroupMaxSize;
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 5e4e899faa6..f92594f09d0 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -182,12 +182,12 @@ public class GroupCoordinatorService implements
GroupCoordinator {
.withPartitionWriter(writer)
.withLoader(loader)
.withCoordinatorShardBuilderSupplier(supplier)
- .withTime(time)
.withDefaultWriteTimeOut(Duration.ofMillis(config.offsetCommitTimeoutMs))
.withCoordinatorRuntimeMetrics(coordinatorRuntimeMetrics)
.withCoordinatorMetrics(groupCoordinatorMetrics)
.withSerializer(new CoordinatorRecordSerde())
.withCompression(Compression.of(config.compressionType).build())
+ .withAppendLingerMs(config.appendLingerMs)
.build();
return new GroupCoordinatorService(
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index 4207c94770b..57927867c89 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -24,11 +24,13 @@ import
org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.BufferSupplier;
@@ -50,10 +52,12 @@ import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
@@ -66,8 +70,6 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import static org.apache.kafka.common.record.Record.EMPTY_HEADERS;
-
/**
* The CoordinatorRuntime provides a framework to implement coordinators such
as the group coordinator
* or the transaction coordinator.
@@ -115,6 +117,7 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
private CoordinatorMetrics coordinatorMetrics;
private Serializer<U> serializer;
private Compression compression;
+ private int appendLingerMs;
public Builder<S, U> withLogPrefix(String logPrefix) {
this.logPrefix = logPrefix;
@@ -181,6 +184,11 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
return this;
}
+ public Builder<S, U> withAppendLingerMs(int appendLingerMs) {
+ this.appendLingerMs = appendLingerMs;
+ return this;
+ }
+
public CoordinatorRuntime<S, U> build() {
if (logPrefix == null)
logPrefix = "";
@@ -206,6 +214,8 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
throw new IllegalArgumentException("Serializer must be set.");
if (compression == null)
compression = Compression.NONE;
+ if (appendLingerMs < 0)
+ throw new IllegalArgumentException("AppendLinger must be >=
0");
return new CoordinatorRuntime<>(
logPrefix,
@@ -220,7 +230,8 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
runtimeMetrics,
coordinatorMetrics,
serializer,
- compression
+ compression,
+ appendLingerMs
);
}
}
@@ -275,7 +286,7 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
FAILED {
@Override
boolean canTransitionFrom(CoordinatorState state) {
- return state == LOADING;
+ return state == LOADING || state == ACTIVE;
}
};
@@ -434,6 +445,81 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
}
}
+ /**
+ * A simple container class to hold all the attributes
+ * related to a pending batch.
+ */
+ private static class CoordinatorBatch {
+ /**
+ * The base (or first) offset of the batch. If the batch fails
+ * for any reason, the state machines is rolled back to it.
+ */
+ final long baseOffset;
+
+ /**
+ * The time at which the batch was created.
+ */
+ final long appendTimeMs;
+
+ /**
+ * The max batch size.
+ */
+ final int maxBatchSize;
+
+ /**
+ * The verification guard associated to the batch if it is
+ * transactional.
+ */
+ final VerificationGuard verificationGuard;
+
+ /**
+ * The byte buffer backing the records builder.
+ */
+ final ByteBuffer buffer;
+
+ /**
+ * The records builder.
+ */
+ final MemoryRecordsBuilder builder;
+
+ /**
+ * The timer used to enfore the append linger time if
+ * it is non-zero.
+ */
+ final Optional<TimerTask> lingerTimeoutTask;
+
+ /**
+ * The list of deferred events associated with the batch.
+ */
+ final List<DeferredEvent> deferredEvents;
+
+ /**
+ * The next offset. This is updated when records
+ * are added to the batch.
+ */
+ long nextOffset;
+
+ CoordinatorBatch(
+ long baseOffset,
+ long appendTimeMs,
+ int maxBatchSize,
+ VerificationGuard verificationGuard,
+ ByteBuffer buffer,
+ MemoryRecordsBuilder builder,
+ Optional<TimerTask> lingerTimeoutTask
+ ) {
+ this.baseOffset = baseOffset;
+ this.nextOffset = baseOffset;
+ this.appendTimeMs = appendTimeMs;
+ this.maxBatchSize = maxBatchSize;
+ this.verificationGuard = verificationGuard;
+ this.buffer = buffer;
+ this.builder = builder;
+ this.lingerTimeoutTask = lingerTimeoutTask;
+ this.deferredEvents = new ArrayList<>();
+ }
+ }
+
/**
* CoordinatorContext holds all the metadata around a coordinator state
machine.
*/
@@ -493,6 +579,11 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
*/
BufferSupplier bufferSupplier;
+ /**
+ * The current (or pending) batch.
+ */
+ CoordinatorBatch currentBatch;
+
/**
* Constructor.
*
@@ -547,6 +638,7 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
.build(),
tp
);
+ load();
break;
case ACTIVE:
@@ -573,6 +665,46 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
runtimeMetrics.recordPartitionStateChange(oldState, state);
}
+ /**
+ * Loads the coordinator.
+ */
+ private void load() {
+ if (state != CoordinatorState.LOADING) {
+ throw new IllegalStateException("Coordinator must be in
loading state");
+ }
+
+ loader.load(tp, coordinator).whenComplete((summary, exception) -> {
+ scheduleInternalOperation("CompleteLoad(tp=" + tp + ", epoch="
+ epoch + ")", tp, () -> {
+ CoordinatorContext context = coordinators.get(tp);
+ if (context != null) {
+ if (context.state != CoordinatorState.LOADING) {
+ log.info("Ignored load completion from {} because
context is in {} state.",
+ context.tp, context.state);
+ return;
+ }
+ try {
+ if (exception != null) throw exception;
+ context.transitionTo(CoordinatorState.ACTIVE);
+ if (summary != null) {
+
runtimeMetrics.recordPartitionLoadSensor(summary.startTimeMs(),
summary.endTimeMs());
+ log.info("Finished loading of metadata from {}
with epoch {} in {}ms where {}ms " +
+ "was spent in the scheduler. Loaded {}
records which total to {} bytes.",
+ tp, epoch, summary.endTimeMs() -
summary.startTimeMs(),
+ summary.schedulerQueueTimeMs(),
summary.numRecords(), summary.numBytes());
+ }
+ } catch (Throwable ex) {
+ log.error("Failed to load metadata from {} with
epoch {} due to {}.",
+ tp, epoch, ex.toString());
+ context.transitionTo(CoordinatorState.FAILED);
+ }
+ } else {
+ log.debug("Failed to complete the loading of metadata
for {} in epoch {} since the coordinator does not exist.",
+ tp, epoch);
+ }
+ });
+ });
+ }
+
/**
* Unloads the coordinator.
*/
@@ -583,11 +715,352 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
}
timer.cancelAll();
deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+ failCurrentBatch(Errors.NOT_COORDINATOR.exception());
if (coordinator != null) {
coordinator.onUnloaded();
}
coordinator = null;
}
+
+ /**
+ * Frees the current batch.
+ */
+ private void freeCurrentBatch() {
+ // Cancel the linger timeout.
+ currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+ // Release the buffer.
+ bufferSupplier.release(currentBatch.buffer);
+
+ currentBatch = null;
+ }
+
+ /**
+ * Flushes the current (or pending) batch to the log. When the batch
is written
+ * locally, a new snapshot is created in the snapshot registry and the
events
+ * associated with the batch are added to the deferred event queue.
+ */
+ private void flushCurrentBatch() {
+ if (currentBatch != null) {
+ try {
+ // Write the records to the log and update the last
written offset.
+ long offset = partitionWriter.append(
+ tp,
+ currentBatch.verificationGuard,
+ currentBatch.builder.build()
+ );
+ coordinator.updateLastWrittenOffset(offset);
+
+ if (offset != currentBatch.nextOffset) {
+ log.error("The state machine of the coordinator {} is
out of sync with the underlying log. " +
+ "The last written offset returned is {} while the
coordinator expected {}. The coordinator " +
+ "will be reloaded in order to re-synchronize the
state machine.",
+ tp, offset, currentBatch.nextOffset);
+ // Transition to FAILED state to unload the state
machine and complete
+ // exceptionally all the pending operations.
+ transitionTo(CoordinatorState.FAILED);
+ // Transition to LOADING to trigger the restoration of
the state.
+ transitionTo(CoordinatorState.LOADING);
+ // Thrown NotCoordinatorException to fail the
operation that
+ // triggered the write. We use NotCoordinatorException
to be
+ // consistent with the transition to FAILED.
+ throw Errors.NOT_COORDINATOR.exception();
+ }
+
+ // Add all the pending deferred events to the deferred
event queue.
+ for (DeferredEvent event : currentBatch.deferredEvents) {
+ deferredEventQueue.add(offset, event);
+ }
+
+ // Free up the current batch.
+ freeCurrentBatch();
+ } catch (Throwable t) {
+ log.error("Writing records to {} failed due to: {}.", tp,
t.getMessage());
+ failCurrentBatch(t);
+ // We rethrow the exception for the caller to handle it
too.
+ throw t;
+ }
+ }
+ }
+
+ /**
+ * Flushes the current batch if it is transactional or if it has
passed the append linger time.
+ */
+ private void maybeFlushCurrentBatch(long currentTimeMs) {
+ if (currentBatch != null) {
+ if (currentBatch.builder.isTransactional() ||
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+ flushCurrentBatch();
+ }
+ }
+ }
+
+ /**
+ * Fails the current batch, reverts to the snapshot to the base/start
offset of the
+ * batch, fails all the associated events.
+ */
+ private void failCurrentBatch(Throwable t) {
+ if (currentBatch != null) {
+ coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
+ for (DeferredEvent event : currentBatch.deferredEvents) {
+ event.complete(t);
+ }
+ freeCurrentBatch();
+ }
+ }
+
+ /**
+ * Allocates a new batch if none already exists.
+ */
+ private void maybeAllocateNewBatch(
+ long producerId,
+ short producerEpoch,
+ VerificationGuard verificationGuard,
+ long currentTimeMs
+ ) {
+ if (currentBatch == null) {
+ LogConfig logConfig = partitionWriter.config(tp);
+ byte magic = logConfig.recordVersion().value;
+ int maxBatchSize = logConfig.maxMessageSize();
+ long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+ ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
+
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+ buffer,
+ magic,
+ compression,
+ TimestampType.CREATE_TIME,
+ 0L,
+ currentTimeMs,
+ producerId,
+ producerEpoch,
+ 0,
+ producerId != RecordBatch.NO_PRODUCER_ID,
+ false,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ maxBatchSize
+ );
+
+ Optional<TimerTask> lingerTimeoutTask = Optional.empty();
+ if (appendLingerMs > 0) {
+ lingerTimeoutTask = Optional.of(new
TimerTask(appendLingerMs) {
+ @Override
+ public void run() {
+ // An event to flush the batch is pushed to the
front of the queue
+ // to ensure that the linger time is respected.
+ enqueueFirst(new
CoordinatorInternalEvent("FlushBatch", tp, () -> {
+ if (this.isCancelled()) return;
+ withActiveContextOrThrow(tp,
CoordinatorContext::flushCurrentBatch);
+ }));
+ }
+ });
+ CoordinatorRuntime.this.timer.add(lingerTimeoutTask.get());
+ }
+
+ currentBatch = new CoordinatorBatch(
+ prevLastWrittenOffset,
+ currentTimeMs,
+ maxBatchSize,
+ verificationGuard,
+ buffer,
+ builder,
+ lingerTimeoutTask
+ );
+ }
+ }
+
+ /**
+ * Appends records to the log and replay them to the state machine.
+ *
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @param verificationGuard The verification guard.
+ * @param records The records to append.
+ * @param replay A boolean indicating whether the records
+ * must be replayed or not.
+ * @param event The event that must be completed when the
+ * records are written.
+ */
+ private void append(
+ long producerId,
+ short producerEpoch,
+ VerificationGuard verificationGuard,
+ List<U> records,
+ boolean replay,
+ DeferredEvent event
+ ) {
+ if (state != CoordinatorState.ACTIVE) {
+ throw new IllegalStateException("Coordinator must be active to
append records");
+ }
+
+ if (records.isEmpty()) {
+ // If the records are empty, it was a read operation after
all. In this case,
+ // the response can be returned directly iff there are no
pending write operations;
+ // otherwise, the read needs to wait on the last write
operation to be completed.
+ if (currentBatch != null) {
+ currentBatch.deferredEvents.add(event);
+ } else {
+ OptionalLong pendingOffset =
deferredEventQueue.highestPendingOffset();
+ if (pendingOffset.isPresent()) {
+ deferredEventQueue.add(pendingOffset.getAsLong(),
event);
+ } else {
+ event.complete(null);
+ }
+ }
+ } else {
+ // If the records are not empty, first, they are applied to
the state machine,
+ // second, they are appended to the opened batch.
+ long currentTimeMs = time.milliseconds();
+
+ // If the current write operation is transactional, the
current batch
+ // is written before proceeding with it.
+ if (producerId != RecordBatch.NO_PRODUCER_ID) {
+ // If flushing fails, we don't catch the exception in
order to let
+ // the caller fail the current operation.
+ flushCurrentBatch();
+ }
+
+ // Allocate a new batch if none exists.
+ maybeAllocateNewBatch(
+ producerId,
+ producerEpoch,
+ verificationGuard,
+ currentTimeMs
+ );
+
+ // Prepare the records.
+ List<SimpleRecord> recordsToAppend = new
ArrayList<>(records.size());
+ for (U record : records) {
+ recordsToAppend.add(new SimpleRecord(
+ currentTimeMs,
+ serializer.serializeKey(record),
+ serializer.serializeValue(record)
+ ));
+ }
+
+ // Compute the estimated size of the records.
+ int estimatedSize = AbstractRecords.estimateSizeInBytes(
+ currentBatch.builder.magic(),
+ compression.type(),
+ recordsToAppend
+ );
+
+ // Check if the current batch has enough space. We check is
before
+ // replaying the records in order to avoid having to revert
back
+ // changes if the records do not fit within a batch.
+ if (estimatedSize > currentBatch.builder.maxAllowedBytes()) {
+ throw new RecordTooLargeException("Message batch size is "
+ estimatedSize +
+ " bytes in append to partition " + tp + " which
exceeds the maximum " +
+ "configured size of " + currentBatch.maxBatchSize +
".");
+ }
+
+ if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
+ // Otherwise, we write the current batch, allocate a new
one and re-verify
+ // whether the records fit in it.
+ // If flushing fails, we don't catch the exception in
order to let
+ // the caller fail the current operation.
+ flushCurrentBatch();
+ maybeAllocateNewBatch(
+ producerId,
+ producerEpoch,
+ verificationGuard,
+ currentTimeMs
+ );
+ }
+
+ // Add the event to the list of pending events associated with
the batch.
+ currentBatch.deferredEvents.add(event);
+
+ try {
+ // Apply record to the state machine.
+ if (replay) {
+ for (int i = 0; i < records.size(); i++) {
+ // We compute the offset of the record based on
the last written offset. The
+ // coordinator is the single writer to the
underlying partition so we can
+ // deduce it like this.
+ coordinator.replay(
+ currentBatch.nextOffset + i,
+ producerId,
+ producerEpoch,
+ records.get(i)
+ );
+ }
+ }
+
+ // Append to the batch.
+ for (SimpleRecord record : recordsToAppend) {
+ currentBatch.builder.append(record);
+ currentBatch.nextOffset++;
+ }
+ } catch (Throwable t) {
+ log.error("Replaying records to {} failed due to: {}.",
tp, t.getMessage());
+ // If an exception is thrown, we fail the entire batch.
Exceptions should be
+ // really exceptional in this code path and they would
usually be the results
+ // of bugs preventing records to be replayed.
+ failCurrentBatch(t);
+ }
+
+ // Write the current batch if it is transactional or if the
linger timeout
+ // has expired.
+ // If flushing fails, we don't catch the exception in order to
let
+ // the caller fail the current operation.
+ maybeFlushCurrentBatch(currentTimeMs);
+ }
+ }
+
+ /**
+ * Completes a transaction.
+ *
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @param coordinatorEpoch The coordinator epoch of the transaction
coordinator.
+ * @param result The transaction result.
+ * @param event The event that must be completed when the
+ * control record is written.
+ */
+ private void completeTransaction(
+ long producerId,
+ short producerEpoch,
+ int coordinatorEpoch,
+ TransactionResult result,
+ DeferredEvent event
+ ) {
+ if (state != CoordinatorState.ACTIVE) {
+ throw new IllegalStateException("Coordinator must be active to
complete a transaction");
+ }
+
+ // The current batch must be written before the transaction marker
is written
+ // in order to respect the order.
+ flushCurrentBatch();
+
+ long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+ try {
+ coordinator.replayEndTransactionMarker(
+ producerId,
+ producerEpoch,
+ result
+ );
+
+ long offset = partitionWriter.append(
+ tp,
+ VerificationGuard.SENTINEL,
+ MemoryRecords.withEndTransactionMarker(
+ time.milliseconds(),
+ producerId,
+ producerEpoch,
+ new EndTransactionMarker(
+ result == TransactionResult.COMMIT ?
ControlRecordType.COMMIT : ControlRecordType.ABORT,
+ coordinatorEpoch
+ )
+ )
+ );
+ coordinator.updateLastWrittenOffset(offset);
+
+ deferredEventQueue.add(offset, event);
+ } catch (Throwable t) {
+ coordinator.revertLastWrittenOffset(prevLastWrittenOffset);
+ event.complete(t);
+ }
+ }
}
class OperationTimeout extends TimerTask {
@@ -781,100 +1254,20 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
// Execute the operation.
result =
op.generateRecordsAndResult(context.coordinator.coordinator());
- if (result.records().isEmpty()) {
- // If the records are empty, it was a read operation
after all. In this case,
- // the response can be returned directly iff there are
no pending write operations;
- // otherwise, the read needs to wait on the last write
operation to be completed.
- OptionalLong pendingOffset =
context.deferredEventQueue.highestPendingOffset();
- if (pendingOffset.isPresent()) {
-
context.deferredEventQueue.add(pendingOffset.getAsLong(), this);
- } else {
- complete(null);
- }
- } else {
- // If the records are not empty, first, they are
applied to the state machine,
- // second, then are written to the partition/log, and
finally, the response
- // is put into the deferred event queue.
- long prevLastWrittenOffset =
context.coordinator.lastWrittenOffset();
- LogConfig logConfig = partitionWriter.config(tp);
- byte magic = logConfig.recordVersion().value;
- int maxBatchSize = logConfig.maxMessageSize();
- long currentTimeMs = time.milliseconds();
- ByteBuffer buffer =
context.bufferSupplier.get(Math.min(MIN_BUFFER_SIZE, maxBatchSize));
-
- try {
- MemoryRecordsBuilder builder = new
MemoryRecordsBuilder(
- buffer,
- magic,
- compression,
- TimestampType.CREATE_TIME,
- 0L,
- currentTimeMs,
- producerId,
- producerEpoch,
- 0,
- producerId != RecordBatch.NO_PRODUCER_ID,
- false,
- RecordBatch.NO_PARTITION_LEADER_EPOCH,
- maxBatchSize
- );
-
- // Apply the records to the state machine and add
them to the batch.
- for (int i = 0; i < result.records().size(); i++) {
- U record = result.records().get(i);
-
- if (result.replayRecords()) {
- // We compute the offset of the record
based on the last written offset. The
- // coordinator is the single writer to the
underlying partition so we can
- // deduce it like this.
- context.coordinator.replay(
- prevLastWrittenOffset + i,
- producerId,
- producerEpoch,
- record
- );
- }
-
- byte[] keyBytes =
serializer.serializeKey(record);
- byte[] valBytes =
serializer.serializeValue(record);
-
- if (builder.hasRoomFor(currentTimeMs,
keyBytes, valBytes, EMPTY_HEADERS)) {
- builder.append(
- currentTimeMs,
- keyBytes,
- valBytes,
- EMPTY_HEADERS
- );
- } else {
- throw new RecordTooLargeException("Message
batch size is " + builder.estimatedSizeInBytes() +
- " bytes in append to partition " + tp
+ " which exceeds the maximum " +
- "configured size of " + maxBatchSize +
".");
- }
- }
-
- // Write the records to the log and update the
last written
- // offset.
- long offset = partitionWriter.append(
- tp,
- verificationGuard,
- builder.build()
- );
-
context.coordinator.updateLastWrittenOffset(offset);
+ // Append the records and replay them to the state machine.
+ context.append(
+ producerId,
+ producerEpoch,
+ verificationGuard,
+ result.records(),
+ result.replayRecords(),
+ this
+ );
- // Add the response to the deferred queue.
- if (!future.isDone()) {
- context.deferredEventQueue.add(offset, this);
- operationTimeout = new OperationTimeout(tp,
this, writeTimeout.toMillis());
- timer.add(operationTimeout);
- } else {
- complete(null);
- }
- } catch (Throwable t) {
-
context.coordinator.revertLastWrittenOffset(prevLastWrittenOffset);
- complete(t);
- } finally {
- context.bufferSupplier.release(buffer);
- }
+ // If the operation is not done, create an operation
timeout.
+ if (!future.isDone()) {
+ operationTimeout = new OperationTimeout(tp, this,
writeTimeout.toMillis());
+ timer.add(operationTimeout);
}
});
} catch (Throwable t) {
@@ -1142,40 +1535,17 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
public void run() {
try {
withActiveContextOrThrow(tp, context -> {
- long prevLastWrittenOffset =
context.coordinator.lastWrittenOffset();
+ context.completeTransaction(
+ producerId,
+ producerEpoch,
+ coordinatorEpoch,
+ result,
+ this
+ );
- try {
- context.coordinator.replayEndTransactionMarker(
- producerId,
- producerEpoch,
- result
- );
-
- long offset = partitionWriter.append(
- tp,
- VerificationGuard.SENTINEL,
- MemoryRecords.withEndTransactionMarker(
- time.milliseconds(),
- producerId,
- producerEpoch,
- new EndTransactionMarker(
- result == TransactionResult.COMMIT ?
ControlRecordType.COMMIT : ControlRecordType.ABORT,
- coordinatorEpoch
- )
- )
- );
- context.coordinator.updateLastWrittenOffset(offset);
-
- if (!future.isDone()) {
- context.deferredEventQueue.add(offset, this);
- operationTimeout = new OperationTimeout(tp, this,
writeTimeout.toMillis());
- timer.add(operationTimeout);
- } else {
- complete(null);
- }
- } catch (Throwable t) {
-
context.coordinator.revertLastWrittenOffset(prevLastWrittenOffset);
- complete(t);
+ if (!future.isDone()) {
+ operationTimeout = new OperationTimeout(tp, this,
writeTimeout.toMillis());
+ timer.add(operationTimeout);
}
});
} catch (Throwable t) {
@@ -1449,6 +1819,12 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
*/
private final Compression compression;
+ /**
+ * The duration in milliseconds that the coordinator will wait for writes
to
+ * accumulate before flushing them to disk.
+ */
+ private final int appendLingerMs;
+
/**
* Atomic boolean indicating whether the runtime is running.
*/
@@ -1475,7 +1851,9 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
* @param coordinatorMetrics The coordinator metrics.
* @param serializer The serializer.
* @param compression The compression codec.
+ * @param appendLingerMs The append linger time in ms.
*/
+ @SuppressWarnings("checkstyle:ParameterNumber")
private CoordinatorRuntime(
String logPrefix,
LogContext logContext,
@@ -1489,7 +1867,8 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
CoordinatorRuntimeMetrics runtimeMetrics,
CoordinatorMetrics coordinatorMetrics,
Serializer<U> serializer,
- Compression compression
+ Compression compression,
+ int appendLingerMs
) {
this.logPrefix = logPrefix;
this.logContext = logContext;
@@ -1506,6 +1885,7 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
this.coordinatorMetrics = coordinatorMetrics;
this.serializer = serializer;
this.compression = compression;
+ this.appendLingerMs = appendLingerMs;
}
/**
@@ -1836,36 +2216,6 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
case FAILED:
case INITIAL:
context.transitionTo(CoordinatorState.LOADING);
- loader.load(tp,
context.coordinator).whenComplete((summary, exception) -> {
- scheduleInternalOperation("CompleteLoad(tp=" +
tp + ", epoch=" + partitionEpoch + ")", tp, () -> {
- CoordinatorContext ctx =
coordinators.get(tp);
- if (ctx != null) {
- if (ctx.state !=
CoordinatorState.LOADING) {
- log.info("Ignored load completion
from {} because context is in {} state.",
- ctx.tp, ctx.state);
- return;
- }
- try {
- if (exception != null) throw
exception;
-
ctx.transitionTo(CoordinatorState.ACTIVE);
- if (summary != null) {
-
runtimeMetrics.recordPartitionLoadSensor(summary.startTimeMs(),
summary.endTimeMs());
- log.info("Finished loading of
metadata from {} with epoch {} in {}ms where {}ms " +
- "was spent in the
scheduler. Loaded {} records which total to {} bytes.",
- tp, partitionEpoch,
summary.endTimeMs() - summary.startTimeMs(),
-
summary.schedulerQueueTimeMs(), summary.numRecords(), summary.numBytes());
- }
- } catch (Throwable ex) {
- log.error("Failed to load metadata
from {} with epoch {} due to {}.",
- tp, partitionEpoch,
ex.toString());
-
ctx.transitionTo(CoordinatorState.FAILED);
- }
- } else {
- log.debug("Failed to complete the
loading of metadata for {} in epoch {} since the coordinator does not exist.",
- tp, partitionEpoch);
- }
- });
- });
break;
case LOADING:
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 03306c90407..b65ceda74ed 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -30,6 +30,7 @@ public class GroupCoordinatorConfigTest {
public void testConfigs() {
ConsumerGroupPartitionAssignor assignor = new RangeAssignor();
GroupCoordinatorConfig config = new GroupCoordinatorConfig(
+ 10,
10,
30,
10,
@@ -65,6 +66,7 @@ public class GroupCoordinatorConfigTest {
assertEquals(24 * 60 * 60 * 1000L, config.offsetsRetentionMs);
assertEquals(5000, config.offsetCommitTimeoutMs);
assertEquals(CompressionType.GZIP, config.compressionType);
+ assertEquals(10, config.appendLingerMs);
}
public static GroupCoordinatorConfig createGroupCoordinatorConfig(
@@ -74,6 +76,7 @@ public class GroupCoordinatorConfigTest {
) {
return new GroupCoordinatorConfig(
1,
+ 10,
45,
5,
Integer.MAX_VALUE,
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 6cd96458c64..7ddb04a6d8b 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -118,6 +118,7 @@ public class GroupCoordinatorServiceTest {
private GroupCoordinatorConfig createConfig() {
return new GroupCoordinatorConfig(
1,
+ 10,
45,
5,
Integer.MAX_VALUE,
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
index ae1d4047924..5052881d3c0 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.AbstractRecords;
@@ -60,8 +61,8 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Deque;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
@@ -74,6 +75,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.ACTIVE;
@@ -85,6 +87,8 @@ import static
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.MIN_
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -147,7 +151,7 @@ public class CoordinatorRuntimeTest {
* when poll() is called.
*/
private static class ManualEventProcessor implements
CoordinatorEventProcessor {
- private Deque<CoordinatorEvent> queue = new LinkedList<>();
+ private final Deque<CoordinatorEvent> queue = new LinkedList<>();
@Override
public void enqueueLast(CoordinatorEvent event) throws
RejectedExecutionException {
@@ -274,9 +278,72 @@ public class CoordinatorRuntimeTest {
* A simple Coordinator implementation that stores the records into a set.
*/
static class MockCoordinatorShard implements CoordinatorShard<String> {
+ static class RecordAndMetadata {
+ public final long offset;
+ public final long producerId;
+ public final short producerEpoch;
+ public final String record;
+
+ public RecordAndMetadata(
+ long offset,
+ String record
+ ) {
+ this(
+ offset,
+ RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH,
+ record
+ );
+ }
+
+ public RecordAndMetadata(
+ long offset,
+ long producerId,
+ short producerEpoch,
+ String record
+ ) {
+ this.offset = offset;
+ this.producerId = producerId;
+ this.producerEpoch = producerEpoch;
+ this.record = record;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ RecordAndMetadata that = (RecordAndMetadata) o;
+
+ if (offset != that.offset) return false;
+ if (producerId != that.producerId) return false;
+ if (producerEpoch != that.producerEpoch) return false;
+ return Objects.equals(record, that.record);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (offset ^ (offset >>> 32));
+ result = 31 * result + (int) (producerId ^ (producerId >>>
32));
+ result = 31 * result + (int) producerEpoch;
+ result = 31 * result + (record != null ? record.hashCode() :
0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "RecordAndMetadata(" +
+ "offset=" + offset +
+ ", producerId=" + producerId +
+ ", producerEpoch=" + producerEpoch +
+ ", record='" + record.substring(0, 10) + '\'' +
+ ')';
+ }
+ }
+
private final SnapshotRegistry snapshotRegistry;
- private final TimelineHashSet<String> records;
- private final TimelineHashMap<Long, TimelineHashSet<String>>
pendingRecords;
+ private final TimelineHashSet<RecordAndMetadata> records;
+ private final TimelineHashMap<Long,
TimelineHashSet<RecordAndMetadata>> pendingRecords;
private final CoordinatorTimer<Void, String> timer;
MockCoordinatorShard(
@@ -296,12 +363,19 @@ public class CoordinatorRuntimeTest {
short producerEpoch,
String record
) throws RuntimeException {
+ RecordAndMetadata recordAndMetadata = new RecordAndMetadata(
+ offset,
+ producerId,
+ producerEpoch,
+ record
+ );
+
if (producerId == RecordBatch.NO_PRODUCER_ID) {
- records.add(record);
+ records.add(recordAndMetadata);
} else {
pendingRecords
.computeIfAbsent(producerId, __ -> new
TimelineHashSet<>(snapshotRegistry, 0))
- .add(record);
+ .add(recordAndMetadata);
}
}
@@ -312,7 +386,7 @@ public class CoordinatorRuntimeTest {
TransactionResult result
) throws RuntimeException {
if (result == TransactionResult.COMMIT) {
- TimelineHashSet<String> pending =
pendingRecords.remove(producerId);
+ TimelineHashSet<RecordAndMetadata> pending =
pendingRecords.remove(producerId);
if (pending == null) return;
records.addAll(pending);
} else {
@@ -321,13 +395,26 @@ public class CoordinatorRuntimeTest {
}
Set<String> pendingRecords(long producerId) {
- TimelineHashSet<String> pending = pendingRecords.get(producerId);
+ TimelineHashSet<RecordAndMetadata> pending =
pendingRecords.get(producerId);
if (pending == null) return Collections.emptySet();
- return Collections.unmodifiableSet(new HashSet<>(pending));
+ return Collections.unmodifiableSet(
+ pending.stream().map(record ->
record.record).collect(Collectors.toSet())
+ );
}
Set<String> records() {
- return Collections.unmodifiableSet(new HashSet<>(records));
+ return Collections.unmodifiableSet(
+ records.stream().map(record ->
record.record).collect(Collectors.toSet())
+ );
+ }
+
+ List<RecordAndMetadata> fullRecords() {
+ return Collections.unmodifiableList(
+ records
+ .stream()
+ .sorted(Comparator.comparingLong(record -> record.offset))
+ .collect(Collectors.toList())
+ );
}
CoordinatorTimer<Void, String> timer() {
@@ -407,10 +494,17 @@ public class CoordinatorRuntimeTest {
long timestamp,
String... records
) {
- if (records.length == 0)
+ return records(timestamp,
Arrays.stream(records).collect(Collectors.toList()));
+ }
+
+ private static MemoryRecords records(
+ long timestamp,
+ List<String> records
+ ) {
+ if (records.isEmpty())
return MemoryRecords.EMPTY;
- List<SimpleRecord> simpleRecords = Arrays.stream(records).map(record ->
+ List<SimpleRecord> simpleRecords = records.stream().map(record ->
new SimpleRecord(timestamp,
record.getBytes(Charset.defaultCharset()))
).collect(Collectors.toList());
@@ -447,10 +541,24 @@ public class CoordinatorRuntimeTest {
long timestamp,
String... records
) {
- if (records.length == 0)
+ return transactionalRecords(
+ producerId,
+ producerEpoch,
+ timestamp,
+ Arrays.stream(records).collect(Collectors.toList())
+ );
+ }
+
+ private static MemoryRecords transactionalRecords(
+ long producerId,
+ short producerEpoch,
+ long timestamp,
+ List<String> records
+ ) {
+ if (records.isEmpty())
return MemoryRecords.EMPTY;
- List<SimpleRecord> simpleRecords = Arrays.stream(records).map(record ->
+ List<SimpleRecord> simpleRecords = records.stream().map(record ->
new SimpleRecord(timestamp,
record.getBytes(Charset.defaultCharset()))
).collect(Collectors.toList());
@@ -986,13 +1094,13 @@ public class CoordinatorRuntimeTest {
// Records have been replayed to the coordinator.
assertEquals(mkSet("record1", "record2"),
ctx.coordinator.coordinator().records());
// Records have been written to the log.
- assertEquals(Arrays.asList(
+ assertEquals(Collections.singletonList(
records(timer.time().milliseconds(), "record1", "record2")
), writer.entries(TP));
// Write #2.
CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
- state -> new CoordinatorResult<>(Arrays.asList("record3"),
"response2"));
+ state -> new
CoordinatorResult<>(Collections.singletonList("record3"), "response2"));
// Verify that the write is not committed yet.
assertFalse(write2.isDone());
@@ -1540,7 +1648,7 @@ public class CoordinatorRuntimeTest {
100L
));
// Records have been written to the log.
- assertEquals(Arrays.asList(
+ assertEquals(Collections.singletonList(
transactionalRecords(100L, (short) 5, timer.time().milliseconds(),
"record1", "record2")
), writer.entries(TP));
@@ -1785,7 +1893,7 @@ public class CoordinatorRuntimeTest {
assertEquals(Arrays.asList(0L, 2L),
ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(mkSet("record1", "record2"),
ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Collections.emptySet(),
ctx.coordinator.coordinator().records());
- assertEquals(Arrays.asList(
+ assertEquals(Collections.singletonList(
transactionalRecords(100L, (short) 5, timer.time().milliseconds(),
"record1", "record2")
), writer.entries(TP));
@@ -1807,7 +1915,7 @@ public class CoordinatorRuntimeTest {
assertEquals(Arrays.asList(0L, 2L),
ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(mkSet("record1", "record2"),
ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Collections.emptySet(),
ctx.coordinator.coordinator().records());
- assertEquals(Arrays.asList(
+ assertEquals(Collections.singletonList(
transactionalRecords(100L, (short) 5, timer.time().milliseconds(),
"record1", "record2")
), writer.entries(TP));
}
@@ -1985,7 +2093,7 @@ public class CoordinatorRuntimeTest {
// Read.
List<CompletableFuture<List<String>>> responses =
runtime.scheduleReadAllOperation(
"read",
- (state, offset) -> new ArrayList<>(state.records)
+ (state, offset) -> new ArrayList<>(state.records())
);
assertEquals(
@@ -3059,6 +3167,594 @@ public class CoordinatorRuntimeTest {
assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize);
}
+ @Test
+ public void testScheduleWriteOperationWithBatching() throws
ExecutionException, InterruptedException, TimeoutException {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = new MockPartitionWriter();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(10)
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertNull(ctx.currentBatch);
+
+ // Get the max batch size.
+ int maxBatchSize = writer.config(TP).maxMessageSize();
+
+ // Create records with a quarter of the max batch size each. Keep in
mind that
+ // each batch has a header so it is not possible to have those four
records
+ // in one single batch.
+ List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+ char[] payload = new char[maxBatchSize / 4];
+ Arrays.fill(payload, c);
+ return new String(payload);
+ }).collect(Collectors.toList());
+
+ // Write #1 with two records.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records.subList(0, 2),
"response1")
+ );
+
+ // Verify that the write is not committed yet.
+ assertFalse(write1.isDone());
+
+ // A batch has been created.
+ assertNotNull(ctx.currentBatch);
+
+ // Verify the state. Records are replayed but no batch written.
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertEquals(Arrays.asList(
+ new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+ new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
+ ), ctx.coordinator.coordinator().fullRecords());
+ assertEquals(Collections.emptyList(), writer.entries(TP));
+
+ // Write #2 with one record.
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records.subList(2, 3),
"response2")
+ );
+
+ // Verify that the write is not committed yet.
+ assertFalse(write2.isDone());
+
+ // Verify the state. Records are replayed but no batch written.
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertEquals(Arrays.asList(
+ new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+ new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+ new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
+ ), ctx.coordinator.coordinator().fullRecords());
+ assertEquals(Collections.emptyList(), writer.entries(TP));
+
+ // Write #3 with one record. This one cannot go into the existing batch
+ // so the existing batch should be flushed and a new one should be
created.
+ CompletableFuture<String> write3 =
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records.subList(3, 4),
"response3")
+ );
+
+ // Verify that the write is not committed yet.
+ assertFalse(write3.isDone());
+
+ // Verify the state. Records are replayed. The previous batch
+ // got flushed with all the records but the new one from #3.
+ assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Arrays.asList(0L, 3L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertEquals(Arrays.asList(
+ new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+ new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+ new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+ new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+ ), ctx.coordinator.coordinator().fullRecords());
+ assertEquals(Collections.singletonList(
+ records(timer.time().milliseconds(), records.subList(0, 3))
+ ), writer.entries(TP));
+
+ // Advance past the linger time.
+ timer.advanceClock(11);
+
+ // Verify the state. The pending batch is flushed.
+ assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Arrays.asList(0L, 3L, 4L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertEquals(Arrays.asList(
+ new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+ new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+ new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+ new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+ ), ctx.coordinator.coordinator().fullRecords());
+ assertEquals(Arrays.asList(
+ records(timer.time().milliseconds() - 11, records.subList(0, 3)),
+ records(timer.time().milliseconds() - 11, records.subList(3, 4))
+ ), writer.entries(TP));
+
+ // Commit and verify that writes are completed.
+ writer.commit(TP);
+ assertTrue(write1.isDone());
+ assertTrue(write2.isDone());
+ assertTrue(write3.isDone());
+ assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+ assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+ assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testScheduleWriteOperationWithBatchingWhenRecordsTooLarge() {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = new MockPartitionWriter();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(10)
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertNull(ctx.currentBatch);
+
+ // Get the max batch size.
+ int maxBatchSize = writer.config(TP).maxMessageSize();
+
+ // Create records with a quarter of the max batch size each. Keep in
mind that
+ // each batch has a header so it is not possible to have those four
records
+ // in one single batch.
+ List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+ char[] payload = new char[maxBatchSize / 4];
+ Arrays.fill(payload, c);
+ return new String(payload);
+ }).collect(Collectors.toList());
+
+ // Write all the records.
+ CompletableFuture<String> write =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records, "response1")
+ );
+
+ assertFutureThrows(write, RecordTooLargeException.class);
+ }
+
+ @Test
+ public void testScheduleWriteOperationWithBatchingWhenWriteFails() {
+ MockTimer timer = new MockTimer();
+ // The partition writer only accept no writes.
+ MockPartitionWriter writer = new MockPartitionWriter(0);
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(10)
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertNull(ctx.currentBatch);
+
+ // Get the max batch size.
+ int maxBatchSize = writer.config(TP).maxMessageSize();
+
+ // Create records with a quarter of the max batch size each. Keep in
mind that
+ // each batch has a header so it is not possible to have those four
records
+ // in one single batch.
+ List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+ char[] payload = new char[maxBatchSize / 4];
+ Arrays.fill(payload, c);
+ return new String(payload);
+ }).collect(Collectors.toList());
+
+ // Write #1.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records.subList(0, 1),
"response1"));
+
+ // Write #2.
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records.subList(1, 2),
"response2"));
+
+ // Write #3.
+ CompletableFuture<String> write3 =
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records.subList(2, 3),
"response3"));
+
+ // Verify the state.
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertEquals(Arrays.asList(
+ new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+ new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+ new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
+ ), ctx.coordinator.coordinator().fullRecords());
+ assertEquals(Collections.emptyList(), writer.entries(TP));
+
+ // Write #4. This write cannot make it in the current batch. So the
current batch
+ // is flushed. It will fail. So we expect all writes to fail.
+ CompletableFuture<String> write4 =
runtime.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records.subList(3, 4),
"response4"));
+
+ // Verify the futures.
+ assertFutureThrows(write1, KafkaException.class);
+ assertFutureThrows(write2, KafkaException.class);
+ assertFutureThrows(write3, KafkaException.class);
+ // Write #4 is also expected to fail.
+ assertFutureThrows(write4, KafkaException.class);
+
+ // Verify the state. The state should be reverted to the initial state.
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertEquals(Collections.emptyList(),
ctx.coordinator.coordinator().fullRecords());
+ assertEquals(Collections.emptyList(), writer.entries(TP));
+ }
+
+ @Test
+ public void testScheduleWriteOperationWithBatchingWhenReplayFails() {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = new MockPartitionWriter();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(10)
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertNull(ctx.currentBatch);
+
+ // Override the coordinator with a coordinator that throws
+ // an exception when replay is called.
+ SnapshotRegistry snapshotRegistry = ctx.coordinator.snapshotRegistry();
+ ctx.coordinator = new SnapshottableCoordinator<>(
+ new LogContext(),
+ snapshotRegistry,
+ new MockCoordinatorShard(snapshotRegistry, ctx.timer) {
+ @Override
+ public void replay(
+ long offset,
+ long producerId,
+ short producerEpoch,
+ String record
+ ) throws RuntimeException {
+ if (offset >= 1) {
+ throw new IllegalArgumentException("error");
+ }
+ super.replay(
+ offset,
+ producerId,
+ producerEpoch,
+ record
+ );
+ }
+ },
+ TP
+ );
+
+ // Get the max batch size.
+ int maxBatchSize = writer.config(TP).maxMessageSize();
+
+ // Create records with a quarter of the max batch size each.
+ List<String> records = Stream.of('1', '2').map(c -> {
+ char[] payload = new char[maxBatchSize / 4];
+ Arrays.fill(payload, c);
+ return new String(payload);
+ }).collect(Collectors.toList());
+
+ // Write #1.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records.subList(0, 1),
"response1"));
+
+ // Verify the state.
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertEquals(Collections.singletonList(
+ new MockCoordinatorShard.RecordAndMetadata(0, records.get(0))
+ ), ctx.coordinator.coordinator().fullRecords());
+ assertEquals(Collections.emptyList(), writer.entries(TP));
+
+ // Write #2. It should fail.
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records.subList(1, 2),
"response2"));
+
+ // Verify the futures.
+ assertFutureThrows(write1, IllegalArgumentException.class);
+ assertFutureThrows(write2, IllegalArgumentException.class);
+
+ // Verify the state.
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertEquals(Collections.emptyList(),
ctx.coordinator.coordinator().fullRecords());
+ assertEquals(Collections.emptyList(), writer.entries(TP));
+ }
+
+ @Test
+ public void testScheduleTransactionalWriteOperationWithBatching() throws
ExecutionException, InterruptedException, TimeoutException {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = new MockPartitionWriter();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(10)
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertNull(ctx.currentBatch);
+
+ // Write #1 with one record.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+ state -> new
CoordinatorResult<>(Collections.singletonList("record#1"), "response1")
+ );
+
+ // Verify that the write is not committed yet.
+ assertFalse(write1.isDone());
+
+ // Verify the state. Records are replayed but no batch written.
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertEquals(Collections.emptySet(),
ctx.coordinator.coordinator().pendingRecords(100L));
+ assertEquals(mkSet("record#1"),
ctx.coordinator.coordinator().records());
+ assertEquals(Collections.emptyList(), writer.entries(TP));
+
+ // Transactional write #2 with one record. This will flush the current
batch.
+ CompletableFuture<String> write2 =
runtime.scheduleTransactionalWriteOperation(
+ "txn-write#1",
+ TP,
+ "transactional-id",
+ 100L,
+ (short) 50,
+ Duration.ofMillis(20),
+ state -> new
CoordinatorResult<>(Collections.singletonList("record#2"), "response2"),
+ TXN_OFFSET_COMMIT_LATEST_VERSION
+ );
+
+ // Verify that the write is not committed yet.
+ assertFalse(write2.isDone());
+
+ // Verify the state. The current batch and the transactional records
are
+ // written to the log.
+ assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Arrays.asList(0L, 1L, 2L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertEquals(mkSet("record#2"),
ctx.coordinator.coordinator().pendingRecords(100L));
+ assertEquals(mkSet("record#1"),
ctx.coordinator.coordinator().records());
+ assertEquals(Arrays.asList(
+ records(timer.time().milliseconds(), "record#1"),
+ transactionalRecords(100L, (short) 50,
timer.time().milliseconds(), "record#2")
+ ), writer.entries(TP));
+
+ // Write #3 with one record.
+ CompletableFuture<String> write3 =
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+ state -> new
CoordinatorResult<>(Collections.singletonList("record#3"), "response3")
+ );
+
+ // Verify that the write is not committed yet.
+ assertFalse(write3.isDone());
+
+ // Verify the state.
+ assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Arrays.asList(0L, 1L, 2L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertEquals(mkSet("record#2"),
ctx.coordinator.coordinator().pendingRecords(100L));
+ assertEquals(mkSet("record#1", "record#3"),
ctx.coordinator.coordinator().records());
+ assertEquals(Arrays.asList(
+ records(timer.time().milliseconds(), "record#1"),
+ transactionalRecords(100L, (short) 50,
timer.time().milliseconds(), "record#2")
+ ), writer.entries(TP));
+
+ // Complete transaction #1. It will flush the current batch if any.
+ CompletableFuture<Void> complete1 =
runtime.scheduleTransactionCompletion(
+ "complete#1",
+ TP,
+ 100L,
+ (short) 50,
+ 10,
+ TransactionResult.COMMIT,
+ DEFAULT_WRITE_TIMEOUT
+ );
+
+ // Verify that the completion is not committed yet.
+ assertFalse(complete1.isDone());
+
+ // Verify the state.
+ assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Arrays.asList(0L, 1L, 2L, 3L, 4L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertEquals(Collections.emptySet(),
ctx.coordinator.coordinator().pendingRecords(100L));
+ assertEquals(mkSet("record#1", "record#2", "record#3"),
ctx.coordinator.coordinator().records());
+ assertEquals(Arrays.asList(
+ records(timer.time().milliseconds(), "record#1"),
+ transactionalRecords(100L, (short) 50,
timer.time().milliseconds(), "record#2"),
+ records(timer.time().milliseconds(), "record#3"),
+ endTransactionMarker(100L, (short) 50,
timer.time().milliseconds(), 10, ControlRecordType.COMMIT)
+ ), writer.entries(TP));
+
+ // Commit and verify that writes are completed.
+ writer.commit(TP);
+ assertTrue(write1.isDone());
+ assertTrue(write2.isDone());
+ assertTrue(write3.isDone());
+ assertTrue(complete1.isDone());
+ assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+ assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+ assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
+ assertNull(complete1.get(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testStateMachineIsReloadedWhenOutOfSync() {
+ MockTimer timer = new MockTimer();
+ MockCoordinatorLoader loader = spy(new MockCoordinatorLoader());
+ MockPartitionWriter writer = new MockPartitionWriter() {
+ @Override
+ public long append(
+ TopicPartition tp,
+ VerificationGuard verificationGuard,
+ MemoryRecords batch
+ ) {
+ // Add 1 to the returned offsets.
+ return super.append(tp, verificationGuard, batch) + 1;
+ }
+ };
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(loader)
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(10)
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(ACTIVE, ctx.state);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertNull(ctx.currentBatch);
+
+ // Keep a reference to the current coordinator.
+ SnapshottableCoordinator<MockCoordinatorShard, String> coordinator =
ctx.coordinator;
+
+ // Get the max batch size.
+ int maxBatchSize = writer.config(TP).maxMessageSize();
+
+ // Create records with a quarter of the max batch size each. Keep in
mind that
+ // each batch has a header so it is not possible to have those four
records
+ // in one single batch.
+ List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+ char[] payload = new char[maxBatchSize / 4];
+ Arrays.fill(payload, c);
+ return new String(payload);
+ }).collect(Collectors.toList());
+
+ // Write #1.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records.subList(0, 1),
"response1"));
+
+ // Write #2.
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records.subList(1, 2),
"response2"));
+
+ // Write #3.
+ CompletableFuture<String> write3 =
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records.subList(2, 3),
"response3"));
+
+ // Write #4. This write cannot make it in the current batch. So the
current batch
+ // is flushed. It will fail. So we expect all writes to fail.
+ CompletableFuture<String> write4 =
runtime.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records.subList(3, 4),
"response4"));
+
+ // Verify the futures.
+ assertFutureThrows(write1, NotCoordinatorException.class);
+ assertFutureThrows(write2, NotCoordinatorException.class);
+ assertFutureThrows(write3, NotCoordinatorException.class);
+ // Write #4 is also expected to fail.
+ assertFutureThrows(write4, NotCoordinatorException.class);
+
+ // Verify that the state machine was loaded twice.
+ verify(loader, times(2)).load(eq(TP), any());
+
+ // Verify that the state is active and that the state machine
+ // is actually a new one.
+ assertEquals(ACTIVE, ctx.state);
+ assertNotEquals(coordinator, ctx.coordinator);
+ }
+
private static <S extends CoordinatorShard<U>, U>
ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(
CoordinatorRuntime<S, U> runtime,
TopicPartition tp
diff --git
a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java
b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java
index ec6e8b3d783..ac58b68ec5e 100644
---
a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java
+++
b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java
@@ -32,6 +32,10 @@ public abstract class TimerTask implements Runnable {
}
}
+ public boolean isCancelled() {
+ return timerTaskEntry == null;
+ }
+
final void setTimerTaskEntry(TimerTaskEntry entry) {
synchronized (this) {
// if this timerTask is already held by an existing timer task
entry,