This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 699ae1b75b0 KAFKA-16729: Support isolation level for share consumer
(#19261)
699ae1b75b0 is described below
commit 699ae1b75b0f94601f99b389dd930c2910054c38
Author: Abhinav Dixit <[email protected]>
AuthorDate: Thu Apr 10 13:30:03 2025 +0530
KAFKA-16729: Support isolation level for share consumer (#19261)
This PR adds the share group dynamic config `share.isolation.level`.
Until now, share groups only supported `READ_UNCOMMITTED` isolation
level type. With this PR, we aim to support `READ_COMMITTED` isolation
type to share groups.
Reviewers: Andrew Schofield <[email protected]>, Jun Rao
<[email protected]>, Apoorv Mittal <[email protected]>
---
checkstyle/suppressions.xml | 3 +
.../kafka/clients/consumer/ShareConsumerTest.java | 439 +++++++++++++++++-
.../java/kafka/server/share/ShareFetchUtils.java | 3 +-
.../java/kafka/server/share/SharePartition.java | 240 +++++++++-
core/src/main/scala/kafka/server/KafkaApis.scala | 4 +-
.../kafka/server/share/DelayedShareFetchTest.java | 26 +-
.../kafka/server/share/ShareFetchUtilsTest.java | 20 +-
.../server/share/SharePartitionManagerTest.java | 16 +-
.../kafka/server/share/SharePartitionTest.java | 512 +++++++++++++++++++--
.../scala/unit/kafka/server/KafkaApisTest.scala | 3 +-
.../kafka/coordinator/group/GroupConfig.java | 40 ++
.../kafka/coordinator/group/GroupConfigTest.java | 28 ++
12 files changed, 1232 insertions(+), 102 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 59aec7b9350..429d699e7a1 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -168,6 +168,9 @@
<suppress checks="NPathComplexity"
files="(DistributedHerder|AbstractHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|WorkerSourceTask|TopicAdmin).java"/>
+ <suppress checks="ClassFanOutComplexity"
+ files="ShareConsumerTest.java"/>
+
<!-- connect tests-->
<suppress checks="ClassDataAbstractionCoupling"
files="(DistributedHerder|KafkaBasedLog|WorkerSourceTaskWithTopicCreation|WorkerSourceTask)Test.java"/>
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 6f3b292e0d9..8d9e4dbd610 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -83,6 +83,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -102,7 +103,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -325,12 +325,12 @@ public class ShareConsumerTest {
// Waiting for heartbeat to propagate the subscription change.
TestUtils.waitForCondition(() -> {
shareConsumer.poll(Duration.ofMillis(500));
- return partitionExceptionMap.containsKey(tp) &&
partitionExceptionMap.containsKey(tp2);
+ return partitionOffsetsMap.containsKey(tp) &&
partitionOffsetsMap.containsKey(tp2);
}, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records
from the updated subscription");
// Verifying if the callback was invoked without exceptions for
the partitions for both topics.
- assertNull(partitionExceptionMap.get(tp));
- assertNull(partitionExceptionMap.get(tp2));
+ assertFalse(partitionExceptionMap.containsKey(tp));
+ assertFalse(partitionExceptionMap.containsKey(tp2));
verifyShareGroupStateTopicRecordsProduced();
}
}
@@ -356,11 +356,11 @@ public class ShareConsumerTest {
TestUtils.waitForCondition(() -> {
shareConsumer.poll(Duration.ofMillis(500));
- return partitionExceptionMap.containsKey(tp);
+ return partitionOffsetsMap.containsKey(tp);
}, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive call to
callback");
- // We expect null exception as the acknowledgment error code is
null.
- assertNull(partitionExceptionMap.get(tp));
+ // We expect no exception as the acknowledgment error code is null.
+ assertFalse(partitionExceptionMap.containsKey(tp));
verifyShareGroupStateTopicRecordsProduced();
}
}
@@ -388,9 +388,8 @@ public class ShareConsumerTest {
shareConsumer.poll(Duration.ofMillis(1000));
shareConsumer.close();
- // We expect null exception as the acknowledgment error code is
null.
- assertTrue(partitionExceptionMap.containsKey(tp));
- assertNull(partitionExceptionMap.get(tp));
+ // We expect no exception as the acknowledgment error code is null.
+ assertFalse(partitionExceptionMap.containsKey(tp));
verifyShareGroupStateTopicRecordsProduced();
}
}
@@ -423,6 +422,13 @@ public class ShareConsumerTest {
}
}
+ /**
+ * Test implementation of AcknowledgementCommitCallback to track the
completed acknowledgements.
+ * partitionOffsetsMap is used to track the offsets acknowledged for each
partition.
+ * partitionExceptionMap is used to track the exception encountered for
each partition if any.
+ * Note - Multiple calls to {@link #onComplete(Map, Exception)} will not
update the partitionExceptionMap for any existing partitions,
+ * so please ensure to clear the partitionExceptionMap after every call to
onComplete() in a single test.
+ */
private static class TestableAcknowledgementCommitCallback implements
AcknowledgementCommitCallback {
private final Map<TopicPartition, Set<Long>> partitionOffsetsMap;
private final Map<TopicPartition, Exception> partitionExceptionMap;
@@ -442,7 +448,7 @@ public class ShareConsumerTest {
mergedOffsets.addAll(newOffsets);
return mergedOffsets;
});
- if
(!partitionExceptionMap.containsKey(partition.topicPartition())) {
+ if
(!partitionExceptionMap.containsKey(partition.topicPartition()) && exception !=
null) {
partitionExceptionMap.put(partition.topicPartition(),
exception);
}
});
@@ -676,10 +682,10 @@ public class ShareConsumerTest {
// The callback will receive the acknowledgement responses
asynchronously after the next poll.
TestUtils.waitForCondition(() -> {
shareConsumer1.poll(Duration.ofMillis(1000));
- return partitionExceptionMap1.containsKey(tp);
+ return partitionOffsetsMap1.containsKey(tp);
}, 30000, 100L, () -> "Didn't receive call to callback");
- assertNull(partitionExceptionMap1.get(tp));
+ assertFalse(partitionExceptionMap1.containsKey(tp));
verifyShareGroupStateTopicRecordsProduced();
}
}
@@ -693,8 +699,7 @@ public class ShareConsumerTest {
shareConsumer.subscribe(Set.of(tp.topic()));
Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new
HashMap<>();
- Map<TopicPartition, Exception> partitionExceptionMap1 = new
HashMap<>();
- shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallback(partitionOffsetsMap1,
partitionExceptionMap1));
+ shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallback(partitionOffsetsMap1, Map.of()));
// The acknowledgement mode moves to PENDING from UNKNOWN.
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
@@ -715,7 +720,7 @@ public class ShareConsumerTest {
TestUtils.waitForCondition(() -> {
shareConsumer.poll(Duration.ofMillis(500));
- return partitionExceptionMap1.containsKey(tp);
+ return partitionOffsetsMap1.containsKey(tp);
}, 30000, 100L, () -> "Didn't receive call to callback");
verifyShareGroupStateTopicRecordsProduced();
}
@@ -778,8 +783,7 @@ public class ShareConsumerTest {
shareConsumer1.close();
- assertTrue(partitionExceptionMap.containsKey(tp));
- assertNull(partitionExceptionMap.get(tp));
+ assertFalse(partitionExceptionMap.containsKey(tp));
verifyShareGroupStateTopicRecordsProduced();
}
}
@@ -952,10 +956,10 @@ public class ShareConsumerTest {
// The callback will receive the acknowledgement responses after
the next poll.
TestUtils.waitForCondition(() -> {
shareConsumer.poll(Duration.ofMillis(1000));
- return partitionExceptionMap1.containsKey(tp);
+ return partitionOffsetsMap1.containsKey(tp);
}, DEFAULT_MAX_WAIT_MS, 100L, () -> "Acknowledgement commit
callback did not receive the response yet");
- assertNull(partitionExceptionMap1.get(tp));
+ assertFalse(partitionExceptionMap1.containsKey(tp));
verifyShareGroupStateTopicRecordsProduced();
}
}
@@ -2090,6 +2094,335 @@ public class ShareConsumerTest {
verifyShareGroupStateTopicRecordsProduced();
}
+ @ClusterTest
+ public void testReadCommittedIsolationLevel() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_committed");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 5000L, 8);
+ // 5th and 10th message transaction was aborted, hence they won't
be included in the fetched records.
+ assertEquals(8, records.count());
+ int messageCounter = 1;
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ if (messageCounter % 5 == 0)
+ messageCounter++;
+ assertEquals("Message " + messageCounter, new
String(record.value()));
+ messageCounter++;
+ }
+ }
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
+ @ClusterTest
+ public void testReadUncommittedIsolationLevel() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_uncommitted");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 5000L, 10);
+ // Even though 5th and 10th message transaction was aborted, they
will be included in the fetched records since IsolationLevel is
READ_UNCOMMITTED.
+ assertEquals(10, records.count());
+ int messageCounter = 1;
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ assertEquals("Message " + messageCounter, new
String(record.value()));
+ messageCounter++;
+ }
+ }
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
+ @ClusterTest
+ public void testAlterReadUncommittedToReadCommittedIsolationLevel() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_uncommitted");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ transactionalProducer.initTransactions();
+ try {
+ // First transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
1");
+
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 5000L, 1);
+ assertEquals(1, records.count());
+ ConsumerRecord<byte[], byte[]> record =
records.iterator().next();
+ assertEquals("Message 1", new String(record.value()));
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+
+ // Second transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 2");
+
+ records = waitedPoll(shareConsumer, 5000L, 1);
+ assertEquals(1, records.count());
+ record = records.iterator().next();
+ assertEquals("Message 2", new String(record.value()));
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+
+ // Third transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
3");
+ // Fourth transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 4");
+
+ records = waitedPoll(shareConsumer, 5000L, 2);
+ // Message 3 and Message 4 would be returned by this poll.
+ assertEquals(2, records.count());
+ Iterator<ConsumerRecord<byte[], byte[]>> recordIterator =
records.iterator();
+ record = recordIterator.next();
+ assertEquals("Message 3", new String(record.value()));
+ record = recordIterator.next();
+ assertEquals("Message 4", new String(record.value()));
+ // We will make Message 3 and Message 4 available for
re-consumption.
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
+ shareConsumer.commitSync();
+
+ // We are altering IsolationLevel to READ_COMMITTED now. We
will only read committed transactions now.
+ alterShareIsolationLevel("group1", "read_committed");
+
+ // Fifth transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
5");
+ // Sixth transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 6");
+ // Seventh transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 7");
+ // Eighth transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
8");
+
+ // Since isolation level is READ_COMMITTED, we can consume
Message 3 (committed transaction that was released), Message 5 and Message 8.
+ // We cannot consume Message 4 (aborted transaction that was
released), Message 6 and Message 7 since they were aborted.
+ List<String> messages = new ArrayList<>();
+ TestUtils.waitForCondition(() -> {
+ ConsumerRecords<byte[], byte[]> pollRecords =
shareConsumer.poll(Duration.ofMillis(5000));
+ if (pollRecords.count() > 0) {
+ for (ConsumerRecord<byte[], byte[]> pollRecord :
pollRecords)
+ messages.add(new String(pollRecord.value()));
+ pollRecords.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ }
+ return messages.size() == 3;
+ }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume all
records post altering share isolation level");
+
+ assertEquals("Message 3", messages.get(0));
+ assertEquals("Message 5", messages.get(1));
+ assertEquals("Message 8", messages.get(2));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ transactionalProducer.close();
+ }
+ }
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
+ @ClusterTest
+ public void
testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_committed");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ transactionalProducer.initTransactions();
+
+ try {
+ // First transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
1");
+
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 5000L, 1);
+ assertEquals(1, records.count());
+ ConsumerRecord<byte[], byte[]> record =
records.iterator().next();
+ assertEquals("Message 1", new String(record.value()));
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+
+ // Second transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 2");
+
+ // Setting the acknowledgement commit callback to verify
acknowledgement completion.
+ Map<TopicPartition, Set<Long>> partitionOffsetsMap = new
HashMap<>();
+ shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallback(partitionOffsetsMap, Map.of()));
+
+ // We will not receive any records since the transaction for
Message 2 was aborted. Wait for the
+ // aborted marker offset for Message 2 (3L) to be fetched and
acknowledged by the consumer.
+ TestUtils.waitForCondition(() -> {
+ ConsumerRecords<byte[], byte[]> pollRecords =
shareConsumer.poll(Duration.ofMillis(500));
+ return pollRecords.count() == 0 &&
partitionOffsetsMap.containsKey(tp) && partitionOffsetsMap.get(tp).contains(3L);
+ }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume abort
transaction and marker offset for Message 2");
+
+ // Third transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
3");
+ // Fourth transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 4");
+
+ // Setting the acknowledgement commit callback to verify
acknowledgement completion.
+ Map<TopicPartition, Set<Long>> partitionOffsetsMap2 = new
HashMap<>();
+ shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallback(partitionOffsetsMap2, Map.of()));
+
+ records = waitedPoll(shareConsumer, 5000L, 1);
+ // Message 3 would be returned by this poll.
+ assertEquals(1, records.count());
+ record = records.iterator().next();
+ assertEquals("Message 3", new String(record.value()));
+ // We will make Message 3 available for re-consumption.
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
+ shareConsumer.commitSync();
+
+ // Wait for the aborted marker offset for Message 4 (7L) to be
fetched and acknowledged by the consumer.
+ TestUtils.waitForCondition(() -> {
+ shareConsumer.poll(Duration.ofMillis(500));
+ return partitionOffsetsMap2.containsKey(tp) &&
partitionOffsetsMap2.get(tp).contains(7L);
+ }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume abort
transaction marker offset for Message 4");
+
+ // We are altering IsolationLevel to READ_UNCOMMITTED now. We
will read both committed/aborted transactions now.
+ alterShareIsolationLevel("group1", "read_uncommitted");
+
+ // Fifth transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
5");
+ // Sixth transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 6");
+ // Seventh transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 7");
+ // Eighth transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
8");
+
+ // Since isolation level is READ_UNCOMMITTED, we can consume
Message 3 (committed transaction that was released), Message 5, Message 6,
Message 7 and Message 8.
+ List<String> finalMessages = new ArrayList<>();
+ TestUtils.waitForCondition(() -> {
+ ConsumerRecords<byte[], byte[]> pollRecords =
shareConsumer.poll(Duration.ofMillis(5000));
+ if (pollRecords.count() > 0) {
+ for (ConsumerRecord<byte[], byte[]> pollRecord :
pollRecords)
+ finalMessages.add(new String(pollRecord.value()));
+ pollRecords.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ }
+ return finalMessages.size() == 5;
+ }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume all
records post altering share isolation level");
+
+ assertEquals("Message 3", finalMessages.get(0));
+ assertEquals("Message 5", finalMessages.get(1));
+ assertEquals("Message 6", finalMessages.get(2));
+ assertEquals("Message 7", finalMessages.get(3));
+ assertEquals("Message 8", finalMessages.get(4));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ transactionalProducer.close();
+ }
+ }
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
+ @ClusterTest
+ public void
testAlterReadCommittedToReadUncommittedIsolationLevelWithRejectAck() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_committed");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ transactionalProducer.initTransactions();
+
+ try {
+ // First transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
1");
+
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 5000L, 1);
+ assertEquals(1, records.count());
+ ConsumerRecord<byte[], byte[]> record =
records.iterator().next();
+ assertEquals("Message 1", new String(record.value()));
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+
+ // Second transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 2");
+
+ // Setting the acknowledgement commit callback to verify
acknowledgement completion.
+ Map<TopicPartition, Set<Long>> partitionOffsetsMap = new
HashMap<>();
+ shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallback(partitionOffsetsMap, Map.of()));
+
+ // We will not receive any records since the transaction for
Message 2 was aborted. Wait for the
+ // aborted marker offset for Message 2 (3L) to be fetched and
acknowledged by the consumer.
+ TestUtils.waitForCondition(() -> {
+ ConsumerRecords<byte[], byte[]> pollRecords =
shareConsumer.poll(Duration.ofMillis(500));
+ return pollRecords.count() == 0 &&
partitionOffsetsMap.containsKey(tp) && partitionOffsetsMap.get(tp).contains(3L);
+ }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume abort
transaction and marker offset for Message 2");
+
+ // Third transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
3");
+ // Fourth transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 4");
+
+ // Setting the acknowledgement commit callback to verify
acknowledgement completion.
+ Map<TopicPartition, Set<Long>> partitionOffsetsMap2 = new
HashMap<>();
+ shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallback(partitionOffsetsMap2, Map.of()));
+
+ records = waitedPoll(shareConsumer, 5000L, 1);
+ // Message 3 would be returned by this poll.
+ assertEquals(1, records.count());
+ record = records.iterator().next();
+ assertEquals("Message 3", new String(record.value()));
+ // We will make Message 3 available for re-consumption.
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.REJECT));
+ shareConsumer.commitSync();
+
+ // Wait for the aborted marker offset for Message 4 (7L) to be
fetched and acknowledged by the consumer.
+ TestUtils.waitForCondition(() -> {
+ shareConsumer.poll(Duration.ofMillis(500));
+ return partitionOffsetsMap2.containsKey(tp) &&
partitionOffsetsMap2.get(tp).contains(7L);
+ }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume abort
transaction marker offset for Message 4");
+
+ // We are altering IsolationLevel to READ_UNCOMMITTED now. We
will read both committed/aborted transactions now.
+ alterShareIsolationLevel("group1", "read_uncommitted");
+
+ // Fifth transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
5");
+ // Sixth transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 6");
+ // Seventh transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 7");
+ // Eighth transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
8");
+
+ // Since isolation level is READ_UNCOMMITTED, we can consume
Message 5, Message 6, Message 7 and Message 8.
+ List<String> finalMessages = new ArrayList<>();
+ TestUtils.waitForCondition(() -> {
+ ConsumerRecords<byte[], byte[]> pollRecords =
shareConsumer.poll(Duration.ofMillis(5000));
+ if (pollRecords.count() > 0) {
+ for (ConsumerRecord<byte[], byte[]> pollRecord :
pollRecords)
+ finalMessages.add(new String(pollRecord.value()));
+ pollRecords.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ }
+ return finalMessages.size() == 4;
+ }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume all
records post altering share isolation level");
+
+ assertEquals("Message 5", finalMessages.get(0));
+ assertEquals("Message 6", finalMessages.get(1));
+ assertEquals("Message 7", finalMessages.get(2));
+ assertEquals("Message 8", finalMessages.get(3));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ transactionalProducer.close();
+ }
+ }
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
/**
* Util class to encapsulate state for a consumer/producer
* being executed by an {@link ExecutorService}.
@@ -2127,6 +2460,59 @@ public class ShareConsumerTest {
}
}
+ private void produceCommittedTransaction(Producer<byte[], byte[]>
transactionalProducer, String message) {
+ try {
+ transactionalProducer.beginTransaction();
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, message.getBytes(),
message.getBytes());
+ Future<RecordMetadata> future = transactionalProducer.send(record);
+ transactionalProducer.flush();
+ future.get(); // Ensure producer send is complete before committing
+ transactionalProducer.commitTransaction();
+ } catch (Exception e) {
+ transactionalProducer.abortTransaction();
+ }
+ }
+
+ private void produceAbortedTransaction(Producer<byte[], byte[]>
transactionalProducer, String message) {
+ try {
+ transactionalProducer.beginTransaction();
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, message.getBytes(),
message.getBytes());
+ transactionalProducer.send(record);
+ transactionalProducer.flush();
+ transactionalProducer.abortTransaction();
+ } catch (Exception e) {
+ transactionalProducer.abortTransaction();
+ }
+ }
+
+ private void
produceCommittedAndAbortedTransactionsInInterval(Producer<byte[], byte[]>
transactionalProducer, int messageCount, int intervalAbortedTransactions) {
+ transactionalProducer.initTransactions();
+ int transactionCount = 0;
+ try {
+ for (int i = 0; i < messageCount; i++) {
+ transactionalProducer.beginTransaction();
+ String recordMessage = "Message " + (i + 1);
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, recordMessage.getBytes(),
recordMessage.getBytes());
+ Future<RecordMetadata> future =
transactionalProducer.send(record);
+ transactionalProducer.flush();
+ // Increment transaction count
+ transactionCount++;
+ if (transactionCount % intervalAbortedTransactions == 0) {
+ // Aborts every intervalAbortedTransactions transaction
+ transactionalProducer.abortTransaction();
+ } else {
+ // Commits other transactions
+ future.get(); // Ensure producer send is complete before
committing
+ transactionalProducer.commitTransaction();
+ }
+ }
+ } catch (Exception e) {
+ transactionalProducer.abortTransaction();
+ } finally {
+ transactionalProducer.close();
+ }
+ }
+
private int consumeMessages(AtomicInteger totalMessagesConsumed,
int totalMessages,
String groupId,
@@ -2326,6 +2712,19 @@ public class ShareConsumerTest {
}
}
+ private void alterShareIsolationLevel(String groupId, String newValue) {
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.GROUP, groupId);
+ Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new
HashMap<>();
+ alterEntries.put(configResource, List.of(new AlterConfigOp(new
ConfigEntry(
+ GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, newValue),
AlterConfigOp.OpType.SET)));
+ AlterConfigsOptions alterOptions = new AlterConfigsOptions();
+ try (Admin adminClient = createAdminClient()) {
+ assertDoesNotThrow(() ->
adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
+ .all()
+ .get(60, TimeUnit.SECONDS), "Failed to alter configs");
+ }
+ }
+
/**
* Test utility which encapsulates a {@link ShareConsumer} whose record
processing
* behavior can be supplied as a function argument.
diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index 19b6977aec9..3cfab25e684 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -114,7 +114,8 @@ public class ShareFetchUtils {
shareFetch.batchSize(),
shareFetch.maxFetchRecords() - acquiredRecordsCount,
shareFetchPartitionData.fetchOffset(),
- fetchPartitionData
+ fetchPartitionData,
+ shareFetch.fetchParams().isolation
);
log.trace("Acquired records: {} for topicIdPartition: {}",
shareAcquiredRecords, topicIdPartition);
// Maybe, in the future, check if no records are acquired, and
we want to retry
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 8650e4cc2be..db2ff4fc8cf 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -31,8 +31,11 @@ import
org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfig;
@@ -54,6 +57,7 @@ import
org.apache.kafka.server.share.persister.PersisterStateBatch;
import org.apache.kafka.server.share.persister.ReadShareGroupStateParameters;
import org.apache.kafka.server.share.persister.TopicData;
import org.apache.kafka.server.share.persister.WriteShareGroupStateParameters;
+import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
@@ -63,12 +67,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -85,7 +94,7 @@ import static
kafka.server.share.ShareFetchUtils.recordLockDurationMsOrDefault;
* consumers. The class maintains the state of the records that have been
fetched from the leader
* and are in-flight.
*/
-@SuppressWarnings("ClassDataAbstractionCoupling")
+@SuppressWarnings({"ClassDataAbstractionCoupling", "ClassFanOutComplexity"})
public class SharePartition {
private static final Logger log =
LoggerFactory.getLogger(SharePartition.class);
@@ -686,6 +695,7 @@ public class SharePartition {
* if the records are already part of the same
fetch batch.
* @param fetchOffset The fetch offset for which the records are
fetched.
* @param fetchPartitionData The fetched records for the share partition.
+ * @param isolationLevel The isolation level for the share fetch
request.
* @return The acquired records for the share partition.
*/
@SuppressWarnings("cyclomaticcomplexity") // Consider refactoring to avoid
suppression
@@ -694,7 +704,8 @@ public class SharePartition {
int batchSize,
int maxFetchRecords,
long fetchOffset,
- FetchPartitionData fetchPartitionData
+ FetchPartitionData fetchPartitionData,
+ FetchIsolation isolationLevel
) {
log.trace("Received acquire request for share partition: {}-{}
memberId: {}", groupId, topicIdPartition, memberId);
if (stateNotActive() || maxFetchRecords <= 0) {
@@ -739,8 +750,9 @@ public class SharePartition {
if (subMap.isEmpty()) {
log.trace("No cached data exists for the share partition for
requested fetch batch: {}-{}",
groupId, topicIdPartition);
- return acquireNewBatchRecords(memberId,
fetchPartitionData.records.batches(),
+ ShareAcquiredRecords shareAcquiredRecords =
acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
firstBatch.baseOffset(), lastBatch.lastOffset(),
batchSize, maxFetchRecords);
+ return
maybeFilterAbortedTransactionalAcquiredRecords(fetchPartitionData,
isolationLevel, shareAcquiredRecords);
}
log.trace("Overlap exists with in-flight records. Acquire the
records if available for"
@@ -850,6 +862,7 @@ public class SharePartition {
}
if (!result.isEmpty()) {
maybeUpdateReadGapFetchOffset(result.get(result.size() -
1).lastOffset() + 1);
+ return
maybeFilterAbortedTransactionalAcquiredRecords(fetchPartitionData,
isolationLevel, new ShareAcquiredRecords(result, acquiredCount));
}
return new ShareAcquiredRecords(result, acquiredCount);
} finally {
@@ -1169,7 +1182,7 @@ public class SharePartition {
// Though such batches can be removed from the cache, but it is
better to archive them so
// that they are never acquired again.
- boolean anyRecordArchived = archiveAvailableRecords(fetchOffset,
baseOffset, subMap);
+ boolean anyRecordArchived = archiveRecords(fetchOffset,
baseOffset, subMap, RecordState.AVAILABLE);
// If we have transitioned the state of any batch/offset from
AVAILABLE to ARCHIVED,
// then there is a chance that the next fetch offset can change.
@@ -1190,21 +1203,22 @@ public class SharePartition {
private boolean archiveAvailableRecordsOnLsoMovement(long logStartOffset) {
lock.writeLock().lock();
try {
- return archiveAvailableRecords(startOffset, logStartOffset,
cachedState);
+ return archiveRecords(startOffset, logStartOffset, cachedState,
RecordState.AVAILABLE);
} finally {
lock.writeLock().unlock();
}
}
/**
- * The method archive the available records in the given map that are
before the end offset.
+ * The method archive the records in a given state in the map that are
before the end offset.
*
- * @param startOffset The offset from which the available records should
be archived.
- * @param endOffset The offset before which the available records should
be archived.
+ * @param startOffset The offset from which the records should be archived.
+ * @param endOffset The offset before which the records should be archived.
* @param map The map containing the in-flight records.
+ * @param initialState The initial state of the records to be archived.
* @return A boolean which indicates whether any record is archived or not.
*/
- private boolean archiveAvailableRecords(long startOffset, long endOffset,
NavigableMap<Long, InFlightBatch> map) {
+ private boolean archiveRecords(long startOffset, long endOffset,
NavigableMap<Long, InFlightBatch> map, RecordState initialState) {
lock.writeLock().lock();
try {
boolean isAnyOffsetArchived = false, isAnyBatchArchived = false;
@@ -1225,16 +1239,16 @@ public class SharePartition {
groupId, topicIdPartition);
if (inFlightBatch.offsetState() == null) {
- if (inFlightBatch.batchState() !=
RecordState.AVAILABLE) {
+ if (inFlightBatch.batchState() != initialState) {
continue;
}
inFlightBatch.maybeInitializeOffsetStateUpdate();
}
- isAnyOffsetArchived = isAnyOffsetArchived ||
archivePerOffsetBatchRecords(inFlightBatch, startOffset, endOffset - 1);
+ isAnyOffsetArchived = isAnyOffsetArchived ||
archivePerOffsetBatchRecords(inFlightBatch, startOffset, endOffset - 1,
initialState);
continue;
}
// The in-flight batch is a full match hence change the state
of the complete batch.
- isAnyBatchArchived = isAnyBatchArchived ||
archiveCompleteBatch(inFlightBatch);
+ isAnyBatchArchived = isAnyBatchArchived ||
archiveCompleteBatch(inFlightBatch, initialState);
}
return isAnyOffsetArchived || isAnyBatchArchived;
} finally {
@@ -1244,7 +1258,9 @@ public class SharePartition {
private boolean archivePerOffsetBatchRecords(InFlightBatch inFlightBatch,
long startOffsetToArchive,
- long endOffsetToArchive) {
+ long endOffsetToArchive,
+ RecordState initialState
+ ) {
lock.writeLock().lock();
try {
boolean isAnyOffsetArchived = false;
@@ -1257,11 +1273,14 @@ public class SharePartition {
// No further offsets to process.
break;
}
- if (offsetState.getValue().state != RecordState.AVAILABLE) {
+ if (offsetState.getValue().state != initialState) {
continue;
}
offsetState.getValue().archive(EMPTY_MEMBER_ID);
+ if (initialState == RecordState.ACQUIRED) {
+
offsetState.getValue().cancelAndClearAcquisitionLockTimeoutTask();
+ }
isAnyOffsetArchived = true;
}
return isAnyOffsetArchived;
@@ -1270,13 +1289,16 @@ public class SharePartition {
}
}
- private boolean archiveCompleteBatch(InFlightBatch inFlightBatch) {
+ private boolean archiveCompleteBatch(InFlightBatch inFlightBatch,
RecordState initialState) {
lock.writeLock().lock();
try {
log.trace("Archiving complete batch: {} for the share partition:
{}-{}", inFlightBatch, groupId, topicIdPartition);
- if (inFlightBatch.batchState() == RecordState.AVAILABLE) {
+ if (inFlightBatch.batchState() == initialState) {
// Change the state of complete batch since the same state
exists for the entire inFlight batch.
inFlightBatch.archiveBatch(EMPTY_MEMBER_ID);
+ if (initialState == RecordState.ACQUIRED) {
+
inFlightBatch.batchState.cancelAndClearAcquisitionLockTimeoutTask();
+ }
return true;
}
} finally {
@@ -2475,6 +2497,192 @@ public class SharePartition {
}
}
+ private ShareAcquiredRecords
maybeFilterAbortedTransactionalAcquiredRecords(
+ FetchPartitionData fetchPartitionData,
+ FetchIsolation isolationLevel,
+ ShareAcquiredRecords shareAcquiredRecords
+ ) {
+ if (isolationLevel != FetchIsolation.TXN_COMMITTED ||
fetchPartitionData.abortedTransactions.isEmpty() ||
fetchPartitionData.abortedTransactions.get().isEmpty())
+ return shareAcquiredRecords;
+
+ // When FetchIsolation.TXN_COMMITTED is used as isolation level by the
share group, we need to filter any
+ // transactions that were aborted/did not commit due to timeout.
+ List<AcquiredRecords> result =
filterAbortedTransactionalAcquiredRecords(fetchPartitionData.records.batches(),
+ shareAcquiredRecords.acquiredRecords(),
fetchPartitionData.abortedTransactions.get());
+ int acquiredCount = 0;
+ for (AcquiredRecords records : result) {
+ acquiredCount += (int) (records.lastOffset() -
records.firstOffset() + 1);
+ }
+ return new ShareAcquiredRecords(result, acquiredCount);
+ }
+
+ private List<AcquiredRecords> filterAbortedTransactionalAcquiredRecords(
+ Iterable<? extends RecordBatch> batches,
+ List<AcquiredRecords> acquiredRecords,
+ List<FetchResponseData.AbortedTransaction> abortedTransactions
+ ) {
+ // The record batches that need to be archived in cachedState because
they were a part of aborted transactions.
+ List<RecordBatch> recordsToArchive =
fetchAbortedTransactionRecordBatches(batches, abortedTransactions);
+ for (RecordBatch recordBatch : recordsToArchive) {
+ // Archive the offsets/batches in the cached state.
+ NavigableMap<Long, InFlightBatch> subMap =
fetchSubMap(recordBatch);
+ archiveRecords(recordBatch.baseOffset(), recordBatch.lastOffset()
+ 1, subMap, RecordState.ACQUIRED);
+ }
+ return filterRecordBatchesFromAcquiredRecords(acquiredRecords,
recordsToArchive);
+ }
+
+ /**
+ * This function filters out the offsets present in the acquired records
list that are also a part of batches that need to be archived.
+ * It follows an iterative refinement of acquired records to eliminate
batches to be archived.
+ * @param acquiredRecordsList The list containing acquired records. This
list is sorted by the firstOffset of the acquired batch.
+ * @param batchesToArchive The list containing record batches to archive.
This list is sorted by the baseOffset of the record batch.
+ * @return The list containing filtered acquired records offsets.
+ */
+ List<AcquiredRecords> filterRecordBatchesFromAcquiredRecords(
+ List<AcquiredRecords> acquiredRecordsList,
+ List<RecordBatch> batchesToArchive
+ ) {
+ Iterator<RecordBatch> batchesToArchiveIterator =
batchesToArchive.iterator();
+ if (!batchesToArchiveIterator.hasNext())
+ return acquiredRecordsList;
+ List<AcquiredRecords> result = new ArrayList<>();
+ Iterator<AcquiredRecords> acquiredRecordsListIter =
acquiredRecordsList.iterator();
+ RecordBatch batchToArchive = batchesToArchiveIterator.next();
+ AcquiredRecords unresolvedAcquiredRecords = null;
+
+ while (unresolvedAcquiredRecords != null ||
acquiredRecordsListIter.hasNext()) {
+ if (unresolvedAcquiredRecords == null)
+ unresolvedAcquiredRecords = acquiredRecordsListIter.next();
+
+ long unresolvedFirstOffset =
unresolvedAcquiredRecords.firstOffset();
+ long unresolvedLastOffset = unresolvedAcquiredRecords.lastOffset();
+ short unresolvedDeliveryCount =
unresolvedAcquiredRecords.deliveryCount();
+
+ if (batchToArchive == null) {
+ result.add(unresolvedAcquiredRecords);
+ unresolvedAcquiredRecords = null;
+ continue;
+ }
+
+ // Non-overlap check - unresolvedFirstOffset offsets lie before
the batchToArchive offsets. No need to filter out the offsets in such a
scenario.
+ if (unresolvedLastOffset < batchToArchive.baseOffset()) {
+ // Offsets in unresolvedAcquiredRecords do not overlap with
batchToArchive, hence it should not get filtered out.
+ result.add(unresolvedAcquiredRecords);
+ unresolvedAcquiredRecords = null;
+ }
+
+ // Overlap check - unresolvedFirstOffset offsets overlap with the
batchToArchive offsets. We need to filter out the overlapping
+ // offsets in such a scenario.
+ if (unresolvedFirstOffset <= batchToArchive.lastOffset() &&
+ unresolvedLastOffset >= batchToArchive.baseOffset()) {
+ unresolvedAcquiredRecords = null;
+ // Split the unresolvedFirstOffset into parts - before and
after the overlapping record batchToArchive.
+ if (unresolvedFirstOffset < batchToArchive.baseOffset()) {
+ // The offsets in unresolvedAcquiredRecords that are
present before batchToArchive's baseOffset should not get filtered out.
+ result.add(new AcquiredRecords()
+ .setFirstOffset(unresolvedFirstOffset)
+ .setLastOffset(batchToArchive.baseOffset() - 1)
+ .setDeliveryCount(unresolvedDeliveryCount));
+ }
+ if (unresolvedLastOffset > batchToArchive.lastOffset()) {
+ // The offsets in unresolvedAcquiredRecords that are
present after batchToArchive's lastOffset should not get filtered out
+ // and should be taken forward for further processing
since they could potentially contain offsets that need to be archived.
+ unresolvedAcquiredRecords = new AcquiredRecords()
+ .setFirstOffset(batchToArchive.lastOffset() + 1)
+ .setLastOffset(unresolvedLastOffset)
+ .setDeliveryCount(unresolvedDeliveryCount);
+ }
+ }
+
+ // There is at least one offset in unresolvedFirstOffset which
lies after the batchToArchive. Hence, we move forward
+ // the batchToArchive to the next element in
batchesToArchiveIterator.
+ if (unresolvedLastOffset > batchToArchive.lastOffset()) {
+ if (batchesToArchiveIterator.hasNext())
+ batchToArchive = batchesToArchiveIterator.next();
+ else
+ batchToArchive = null;
+ }
+ }
+ return result;
+ }
+
+ /**
+ * This function fetches the sub map from cachedState where all the offset
details present in the recordBatch can be referred to
+ * OR it gives an exception if those offsets are not present in
cachedState.
+ * @param recordBatch The record batch for which we want to find the sub
map.
+ * @return the sub map containing all the offset details.
+ */
+ private NavigableMap<Long, InFlightBatch> fetchSubMap(RecordBatch
recordBatch) {
+ lock.readLock().lock();
+ try {
+ Map.Entry<Long, InFlightBatch> floorEntry =
cachedState.floorEntry(recordBatch.baseOffset());
+ if (floorEntry == null) {
+ log.debug("Fetched batch record {} not found for share
partition: {}-{}", recordBatch, groupId,
+ topicIdPartition);
+ throw new IllegalStateException(
+ "Batch record not found. The request batch offsets are not
found in the cache.");
+ }
+ return cachedState.subMap(floorEntry.getKey(), true,
recordBatch.lastOffset(), true);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ // Visible for testing.
+ List<RecordBatch> fetchAbortedTransactionRecordBatches(
+ Iterable<? extends RecordBatch> batches,
+ List<FetchResponseData.AbortedTransaction> abortedTransactions
+ ) {
+ PriorityQueue<FetchResponseData.AbortedTransaction>
orderedAbortedTransactions = orderedAbortedTransactions(abortedTransactions);
+ Set<Long> abortedProducerIds = new HashSet<>();
+ List<RecordBatch> recordsToArchive = new ArrayList<>();
+
+ for (RecordBatch currentBatch : batches) {
+ if (currentBatch.hasProducerId()) {
+ // remove from the aborted transactions queue, all aborted
transactions which have begun before the
+ // current batch's last offset and add the associated
producerIds to the aborted producer set.
+ while (!orderedAbortedTransactions.isEmpty() &&
orderedAbortedTransactions.peek().firstOffset() <= currentBatch.lastOffset()) {
+ FetchResponseData.AbortedTransaction abortedTransaction =
orderedAbortedTransactions.poll();
+ abortedProducerIds.add(abortedTransaction.producerId());
+ }
+ long producerId = currentBatch.producerId();
+ if (containsAbortMarker(currentBatch)) {
+ abortedProducerIds.remove(producerId);
+ } else if (isBatchAborted(currentBatch, abortedProducerIds)) {
+ log.debug("Skipping aborted record batch for share
partition: {}-{} with producerId {} and " +
+ "offsets {} to {}", groupId, topicIdPartition,
producerId, currentBatch.baseOffset(), currentBatch.lastOffset());
+ recordsToArchive.add(currentBatch);
+ }
+ }
+ }
+ return recordsToArchive;
+ }
+
+ private PriorityQueue<FetchResponseData.AbortedTransaction>
orderedAbortedTransactions(List<FetchResponseData.AbortedTransaction>
abortedTransactions) {
+ PriorityQueue<FetchResponseData.AbortedTransaction>
orderedAbortedTransactions = new PriorityQueue<>(
+ abortedTransactions.size(),
Comparator.comparingLong(FetchResponseData.AbortedTransaction::firstOffset)
+ );
+ orderedAbortedTransactions.addAll(abortedTransactions);
+ return orderedAbortedTransactions;
+ }
+
+ private boolean isBatchAborted(RecordBatch batch, Set<Long>
abortedProducerIds) {
+ return batch.isTransactional() &&
abortedProducerIds.contains(batch.producerId());
+ }
+
+ // Visible for testing.
+ boolean containsAbortMarker(RecordBatch batch) {
+ if (!batch.isControlBatch())
+ return false;
+
+ Iterator<Record> batchIterator = batch.iterator();
+ if (!batchIterator.hasNext())
+ return false;
+
+ Record firstRecord = batchIterator.next();
+ return ControlRecordType.ABORT ==
ControlRecordType.parse(firstRecord.key());
+ }
+
// Visible for testing. Should only be used for testing purposes.
NavigableMap<Long, InFlightBatch> cachedState() {
return new ConcurrentSkipListMap<>(cachedState);
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5894d3a40d3..68c923cb9e8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -57,7 +57,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal,
SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.{DelegationToken,
TokenInformation}
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time}
import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.coordinator.group.{Group, GroupConfigManager,
GroupCoordinator}
+import org.apache.kafka.coordinator.group.{Group, GroupConfig,
GroupConfigManager, GroupCoordinator}
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
import org.apache.kafka.server.{ClientMetricsManager, ProcessRole}
@@ -3210,7 +3210,7 @@ class KafkaApis(val requestChannel: RequestChannel,
shareFetchRequest.maxWait,
fetchMinBytes,
fetchMaxBytes,
- FetchIsolation.HIGH_WATERMARK,
+ FetchIsolation.of(FetchRequest.CONSUMER_REPLICA_ID,
groupConfigManager.groupConfig(groupId).map(_.shareIsolationLevel()).orElse(GroupConfig.defaultShareIsolationLevel)),
clientMetadata,
true
)
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index 6f720526eef..bb8b51b40e2 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -181,7 +181,7 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
- when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(),
any())).thenReturn(
+ when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(),
any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
// We are testing the case when the share partition is getting fetched
for the first time, so for the first time
@@ -253,7 +253,7 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
- when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(),
any())).thenReturn(
+ when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(),
any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
// We are testing the case when the share partition has been fetched
before, hence we are mocking positionDiff
@@ -305,7 +305,7 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
- when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(
+ when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
doAnswer(invocation ->
buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
@@ -418,7 +418,7 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
- when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(
+ when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
doAnswer(invocation ->
buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
@@ -580,7 +580,7 @@ public class DelayedShareFetchTest {
// sp1 can be acquired now
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(true);
- when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(
+ when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
// when forceComplete is called for delayedShareFetch2, since tp1 is
common in between delayed share fetch
@@ -676,7 +676,7 @@ public class DelayedShareFetchTest {
BROKER_TOPIC_STATS);
when(sp0.canAcquireRecords()).thenReturn(true);
- when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(),
any())).thenReturn(
+ when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(),
any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
doAnswer(invocation ->
buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
@@ -919,15 +919,15 @@ public class DelayedShareFetchTest {
new CompletableFuture<>(), List.of(tp0, tp1, tp2, tp3, tp4),
BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
- when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(
+ when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
- when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(
+ when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
- when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(
+ when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
- when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(
+ when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
- when(sp4.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(
+ when(sp4.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
// All 5 partitions are acquirable.
@@ -1015,9 +1015,9 @@ public class DelayedShareFetchTest {
new CompletableFuture<>(), List.of(tp0, tp1, tp2, tp3, tp4),
BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
- when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(
+ when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
- when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(
+ when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
// Only 2 out of 5 partitions are acquirable.
diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
index 2103a4fc361..d5acaef2060 100644
--- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
+++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
@@ -98,10 +98,10 @@ public class ShareFetchUtilsTest {
when(sp0.nextFetchOffset()).thenReturn((long) 3);
when(sp1.nextFetchOffset()).thenReturn((long) 3);
- when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(
+ when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords()
.setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
- when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(
+ when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords()
.setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1)));
@@ -163,8 +163,8 @@ public class ShareFetchUtilsTest {
when(sp0.nextFetchOffset()).thenReturn((long) 3);
when(sp1.nextFetchOffset()).thenReturn((long) 3);
- when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty());
- when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty());
+ when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(ShareAcquiredRecords.empty());
+ when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(ShareAcquiredRecords.empty());
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new
LinkedHashMap<>();
sharePartitions.put(tp0, sp0);
@@ -221,11 +221,11 @@ public class ShareFetchUtilsTest {
when(sp0.nextFetchOffset()).thenReturn((long) 0, (long) 5);
when(sp1.nextFetchOffset()).thenReturn((long) 4, (long) 4);
- when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(
+ when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
ShareAcquiredRecords.empty(),
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords()
.setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
- when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(
+ when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords()
.setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1)),
ShareAcquiredRecords.empty());
@@ -309,7 +309,7 @@ public class ShareFetchUtilsTest {
// Mock the replicaManager.fetchOffsetForTimestamp method to return a
timestamp and offset for the topic partition.
FileRecords.TimestampAndOffset timestampAndOffset = new
FileRecords.TimestampAndOffset(100L, 1L, Optional.empty());
doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset),
Optional.empty())).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class),
anyLong(), any(), any(), anyBoolean());
- when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty());
+ when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(ShareAcquiredRecords.empty());
MemoryRecords records = MemoryRecords.withRecords(Compression.NONE,
new SimpleRecord("0".getBytes(), "v".getBytes()),
@@ -390,10 +390,10 @@ public class ShareFetchUtilsTest {
records2, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false);
- when(sp0.acquire(memberId.toString(), BATCH_SIZE, 10, 0,
fetchPartitionData1)).thenReturn(
+ when(sp0.acquire(memberId.toString(), BATCH_SIZE, 10, 0,
fetchPartitionData1, FetchIsolation.HIGH_WATERMARK)).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords()
.setFirstOffset(0).setLastOffset(1).setDeliveryCount((short)
1)));
- when(sp1.acquire(memberId.toString(), BATCH_SIZE, 8, 0,
fetchPartitionData2)).thenReturn(
+ when(sp1.acquire(memberId.toString(), BATCH_SIZE, 8, 0,
fetchPartitionData2, FetchIsolation.HIGH_WATERMARK)).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords()
.setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1)));
@@ -444,7 +444,7 @@ public class ShareFetchUtilsTest {
// Mock the replicaManager.fetchOffsetForTimestamp method to throw
exception.
Throwable exception = new FencedLeaderEpochException("Fenced
exception");
doThrow(exception).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class),
anyLong(), any(), any(), anyBoolean());
- when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty());
+ when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(ShareAcquiredRecords.empty());
// When no records are acquired from share partition.
List<ShareFetchPartitionData> responseData = List.of(
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index a87c1dd735d..a69c6c83071 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -1119,13 +1119,13 @@ public class SharePartitionManagerTest {
when(sp1.canAcquireRecords()).thenReturn(true);
when(sp2.canAcquireRecords()).thenReturn(true);
when(sp3.canAcquireRecords()).thenReturn(true);
- when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(
+ when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
- when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(
+ when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
- when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(
+ when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
- when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class))).thenReturn(
+ when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
// Mocks to have fetch offset metadata match for share partitions to
avoid any extra calls to replicaManager.readFromLog.
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
@@ -1788,8 +1788,8 @@ public class SharePartitionManagerTest {
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp2.maybeAcquireFetchLock()).thenReturn(true);
when(sp2.canAcquireRecords()).thenReturn(false);
- when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any())).thenReturn(ShareAcquiredRecords.empty());
- when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any())).thenReturn(ShareAcquiredRecords.empty());
+ when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(),
any())).thenReturn(ShareAcquiredRecords.empty());
+ when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(),
any())).thenReturn(ShareAcquiredRecords.empty());
List<DelayedOperationKey> delayedShareFetchWatchKeys = new
ArrayList<>();
topicIdPartitions.forEach(topicIdPartition ->
delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId,
topicIdPartition.topicId(), topicIdPartition.partition())));
@@ -2034,7 +2034,7 @@ public class SharePartitionManagerTest {
when(sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId,
Uuid.fromString(memberId))).thenReturn(List.of(tp1, tp3));
doAnswer(invocation ->
buildLogReadResult(List.of(tp1))).when(mockReplicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
- when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any())).thenReturn(new ShareAcquiredRecords(List.of(), 0));
+ when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(),
any())).thenReturn(new ShareAcquiredRecords(List.of(), 0));
// Release acquired records on session close request for tp1 and tp3.
sharePartitionManager.releaseSession(groupId, memberId);
@@ -2604,7 +2604,7 @@ public class SharePartitionManagerTest {
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(true);
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
- when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any())).thenReturn(new ShareAcquiredRecords(List.of(), 0));
+ when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(),
any())).thenReturn(new ShareAcquiredRecords(List.of(), 0));
// Fail initialization for tp2.
SharePartition sp2 = mock(SharePartition.class);
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 91e3435a2f0..64781648774 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -35,12 +35,18 @@ import
org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecord;
+import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.utils.MockTime;
@@ -58,6 +64,7 @@ import
org.apache.kafka.server.share.persister.PersisterStateBatch;
import org.apache.kafka.server.share.persister.ReadShareGroupStateResult;
import org.apache.kafka.server.share.persister.TopicData;
import org.apache.kafka.server.share.persister.WriteShareGroupStateResult;
+import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.SystemTimer;
@@ -74,6 +81,7 @@ import org.mockito.Mockito;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -114,6 +122,7 @@ public class SharePartitionTest {
private static final int DEFAULT_FETCH_OFFSET = 0;
private static final int MAX_FETCH_RECORDS = Integer.MAX_VALUE;
private static final byte ACKNOWLEDGE_TYPE_GAP_ID = 0;
+ private static final FetchIsolation FETCH_ISOLATION_HWM =
FetchIsolation.HIGH_WATERMARK;
private static Timer mockTimer;
private SharePartitionMetrics sharePartitionMetrics;
@@ -1152,7 +1161,8 @@ public class SharePartitionTest {
BATCH_SIZE,
10,
DEFAULT_FETCH_OFFSET,
- fetchPartitionData(records)),
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
5);
assertArrayEquals(expectedAcquiredRecord(0, 4, 1).toArray(),
acquiredRecordsList.toArray());
@@ -1173,7 +1183,8 @@ public class SharePartitionTest {
BATCH_SIZE,
10,
DEFAULT_FETCH_OFFSET,
- fetchPartitionData(records)),
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
20);
assertArrayEquals(expectedAcquiredRecord(5, 24, 1).toArray(),
acquiredRecordsList.toArray());
@@ -1210,7 +1221,8 @@ public class SharePartitionTest {
BATCH_SIZE,
10,
DEFAULT_FETCH_OFFSET,
- fetchPartitionData(records, 10)),
+ fetchPartitionData(records, 10),
+ FETCH_ISOLATION_HWM),
20);
// Validate 2 batches are fetched one with 5 records and other till
end of batch, third batch
@@ -1287,7 +1299,8 @@ public class SharePartitionTest {
BATCH_SIZE,
MAX_FETCH_RECORDS,
DEFAULT_FETCH_OFFSET,
- fetchPartitionData(MemoryRecords.EMPTY)),
+ fetchPartitionData(MemoryRecords.EMPTY),
+ FETCH_ISOLATION_HWM),
0
);
@@ -1306,7 +1319,8 @@ public class SharePartitionTest {
2 /* Batch size */,
10,
DEFAULT_FETCH_OFFSET,
- fetchPartitionData(records)),
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
5);
assertArrayEquals(expectedAcquiredRecord(0, 4, 1).toArray(),
acquiredRecordsList.toArray());
@@ -1337,7 +1351,8 @@ public class SharePartitionTest {
5 /* Batch size */,
100,
DEFAULT_FETCH_OFFSET,
- fetchPartitionData(records)),
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
26 /* Gap of 3 records will also be added to first batch */);
// Fetch expected records from 4 batches, but change the first
expected record to include gap offsets.
@@ -1385,7 +1400,8 @@ public class SharePartitionTest {
2 /* Batch size */,
10,
DEFAULT_FETCH_OFFSET,
- fetchPartitionData(records)),
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
20);
List<AcquiredRecords> expectedAcquiredRecords =
expectedAcquiredRecords(records, 1);
@@ -1420,7 +1436,8 @@ public class SharePartitionTest {
5 /* Batch size */,
100,
DEFAULT_FETCH_OFFSET,
- fetchPartitionData(records)),
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
7 /* Acquisition of records starts post endOffset */);
// Fetch expected single batch, but change the first offset as per
endOffset.
@@ -1451,7 +1468,8 @@ public class SharePartitionTest {
5 /* Batch size */,
100,
DEFAULT_FETCH_OFFSET,
- fetchPartitionData(records)),
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
13 /* Acquisition of records starts post endOffset */);
// Fetch expected records from 2 batches, but change the first batch's
first offset as per endOffset.
@@ -1492,7 +1510,8 @@ public class SharePartitionTest {
5 /* Batch size */,
100,
DEFAULT_FETCH_OFFSET,
- fetchPartitionData(records)),
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
5 /* Acquisition of records starts post endOffset */);
// First batch should be skipped and fetch should result a single
batch (second batch), but
@@ -2524,7 +2543,8 @@ public class SharePartitionTest {
BATCH_SIZE,
6, // maxFetchRecords is less than the number of records
fetched
DEFAULT_FETCH_OFFSET,
- fetchPartitionData(records)),
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
6);
// Since max fetch records (6) is less than the number of records
fetched (8), only 6 records will be acquired
@@ -2573,7 +2593,8 @@ public class SharePartitionTest {
BATCH_SIZE,
8, // maxFetchRecords is less than the number of records
fetched
DEFAULT_FETCH_OFFSET,
- fetchPartitionData(records)),
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
10);
assertArrayEquals(expectedAcquiredRecord(11, 20, 1).toArray(),
acquiredRecordsList.toArray());
@@ -2621,7 +2642,8 @@ public class SharePartitionTest {
BATCH_SIZE,
8, // maxFetchRecords is less than the number of records
fetched
DEFAULT_FETCH_OFFSET,
- fetchPartitionData(records)),
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
10);
assertArrayEquals(expectedAcquiredRecord(11, 20, 3).toArray(),
acquiredRecordsList.toArray());
@@ -3742,8 +3764,8 @@ public class SharePartitionTest {
recordsBuilder.appendWithOffset(20, 0L,
TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
MemoryRecords records2 = recordsBuilder.build();
- sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 5,
fetchPartitionData(records1));
- sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 10,
fetchPartitionData(records2));
+ sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 5,
fetchPartitionData(records1), FETCH_ISOLATION_HWM);
+ sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 10,
fetchPartitionData(records2), FETCH_ISOLATION_HWM);
// Acknowledging over subset of second batch with subset of gap
offsets.
sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(10, 18, List.of(
@@ -3812,8 +3834,8 @@ public class SharePartitionTest {
recordsBuilder.appendWithOffset(20, 0L,
TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
MemoryRecords records2 = recordsBuilder.build();
- sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 5,
fetchPartitionData(records1));
- sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 10,
fetchPartitionData(records2));
+ sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 5,
fetchPartitionData(records1), FETCH_ISOLATION_HWM);
+ sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 10,
fetchPartitionData(records2), FETCH_ISOLATION_HWM);
// Acknowledging over subset of second batch with subset of gap
offsets.
sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(10, 18, List.of(
@@ -4745,7 +4767,7 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5);
- sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15,
fetchPartitionData(memoryRecords(5, 15)));
+ sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15,
fetchPartitionData(memoryRecords(5, 15)), FETCH_ISOLATION_HWM);
fetchAcquiredRecords(sharePartition, memoryRecords(5, 20), 5);
fetchAcquiredRecords(sharePartition, memoryRecords(5, 25), 5);
@@ -4883,7 +4905,7 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5);
- sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15,
fetchPartitionData(memoryRecords(5, 15)));
+ sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15,
fetchPartitionData(memoryRecords(5, 15)), FETCH_ISOLATION_HWM);
fetchAcquiredRecords(sharePartition, memoryRecords(5, 20), 5);
fetchAcquiredRecords(sharePartition, memoryRecords(5, 25), 5);
@@ -5961,7 +5983,7 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
- sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 10,
fetchPartitionData(memoryRecords(5, 10)));
+ sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 10,
fetchPartitionData(memoryRecords(5, 10)), FETCH_ISOLATION_HWM);
fetchAcquiredRecords(sharePartition, memoryRecords(5, 15), 5);
@@ -5992,7 +6014,7 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5);
- sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15,
fetchPartitionData(memoryRecords(5, 15)));
+ sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15,
fetchPartitionData(memoryRecords(5, 15)), FETCH_ISOLATION_HWM);
CompletableFuture<Void> ackResult =
sharePartition.acknowledge(MEMBER_ID, List.of(
new ShareAcknowledgementBatch(5, 9, List.of((byte) 2)),
@@ -6109,12 +6131,12 @@ public class SharePartitionTest {
String memberId1 = "memberId-1";
String memberId2 = "memberId-2";
- sharePartition.acquire(memberId1, BATCH_SIZE, MAX_FETCH_RECORDS,
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1));
+ sharePartition.acquire(memberId1, BATCH_SIZE, MAX_FETCH_RECORDS,
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1), FETCH_ISOLATION_HWM);
assertFalse(sharePartition.findNextFetchOffset());
assertEquals(10, sharePartition.nextFetchOffset());
- sharePartition.acquire(memberId2, BATCH_SIZE, MAX_FETCH_RECORDS, 10,
fetchPartitionData(memoryRecords(10, 10)));
+ sharePartition.acquire(memberId2, BATCH_SIZE, MAX_FETCH_RECORDS, 10,
fetchPartitionData(memoryRecords(10, 10)), FETCH_ISOLATION_HWM);
assertFalse(sharePartition.findNextFetchOffset());
assertEquals(20, sharePartition.nextFetchOffset());
@@ -6125,7 +6147,7 @@ public class SharePartitionTest {
assertTrue(sharePartition.findNextFetchOffset());
assertEquals(5, sharePartition.nextFetchOffset());
- sharePartition.acquire(memberId1, BATCH_SIZE, MAX_FETCH_RECORDS,
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1));
+ sharePartition.acquire(memberId1, BATCH_SIZE, MAX_FETCH_RECORDS,
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1), FETCH_ISOLATION_HWM);
assertTrue(sharePartition.findNextFetchOffset());
assertEquals(20, sharePartition.nextFetchOffset());
@@ -6142,17 +6164,17 @@ public class SharePartitionTest {
String memberId1 = MEMBER_ID;
String memberId2 = "member-2";
- sharePartition.acquire(memberId1, BATCH_SIZE, MAX_FETCH_RECORDS,
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1));
+ sharePartition.acquire(memberId1, BATCH_SIZE, MAX_FETCH_RECORDS,
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1), FETCH_ISOLATION_HWM);
assertEquals(3, sharePartition.nextFetchOffset());
sharePartition.acknowledge(memberId1, List.of(
new ShareAcknowledgementBatch(0, 2, List.of((byte) 2))));
assertEquals(0, sharePartition.nextFetchOffset());
- sharePartition.acquire(memberId2, BATCH_SIZE, MAX_FETCH_RECORDS, 3,
fetchPartitionData(memoryRecords(2, 3)));
+ sharePartition.acquire(memberId2, BATCH_SIZE, MAX_FETCH_RECORDS, 3,
fetchPartitionData(memoryRecords(2, 3)), FETCH_ISOLATION_HWM);
assertEquals(0, sharePartition.nextFetchOffset());
- sharePartition.acquire(memberId1, BATCH_SIZE, MAX_FETCH_RECORDS,
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1));
+ sharePartition.acquire(memberId1, BATCH_SIZE, MAX_FETCH_RECORDS,
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1), FETCH_ISOLATION_HWM);
assertEquals(5, sharePartition.nextFetchOffset());
sharePartition.acknowledge(memberId2, List.of(
@@ -6193,11 +6215,11 @@ public class SharePartitionTest {
new ShareAcknowledgementBatch(17, 20, List.of((byte) 2))));
// Reacquire with another member.
- sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 5,
fetchPartitionData(records1));
+ sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 5,
fetchPartitionData(records1), FETCH_ISOLATION_HWM);
assertEquals(10, sharePartition.nextFetchOffset());
// Reacquire with another member.
- sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 10,
fetchPartitionData(memoryRecords(7, 10)));
+ sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 10,
fetchPartitionData(memoryRecords(7, 10)), FETCH_ISOLATION_HWM);
assertEquals(17, sharePartition.nextFetchOffset());
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).batchState());
@@ -6669,15 +6691,441 @@ public class SharePartitionTest {
return errorMessage.toString();
}
+ @Test
+ public void testFilterRecordBatchesFromAcquiredRecords() {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .build();
+
+ List<AcquiredRecords> acquiredRecords1 = List.of(
+ new
AcquiredRecords().setFirstOffset(1).setLastOffset(5).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(10).setLastOffset(15).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short)
1)
+ );
+ List<RecordBatch> recordBatches1 = List.of(
+ memoryRecordsBuilder(3, 2).build().batches().iterator().next(),
+ memoryRecordsBuilder(3, 12).build().batches().iterator().next()
+ );
+ assertEquals(
+ List.of(
+ new
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(5).setLastOffset(5).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(10).setLastOffset(11).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(15).setLastOffset(15).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short)
1)),
+
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords1,
recordBatches1));
+
+ List<AcquiredRecords> acquiredRecords2 = List.of(
+ new
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short)
3),
+ new
AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short)
3),
+ new
AcquiredRecords().setFirstOffset(9).setLastOffset(30).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(31).setLastOffset(40).setDeliveryCount((short)
3)
+ );
+ List<RecordBatch> recordBatches2 = List.of(
+ memoryRecordsBuilder(21, 5).build().batches().iterator().next(),
+ memoryRecordsBuilder(5, 31).build().batches().iterator().next()
+ );
+ assertEquals(
+ List.of(
+ new
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short)
3),
+ new
AcquiredRecords().setFirstOffset(26).setLastOffset(30).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(36).setLastOffset(40).setDeliveryCount((short)
3)
+
+ ),
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2,
recordBatches2)
+ );
+
+ // Record batches is empty.
+ assertEquals(acquiredRecords2,
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2,
List.of()));
+
+ List<AcquiredRecords> acquiredRecords3 = List.of(
+ new
AcquiredRecords().setFirstOffset(0).setLastOffset(19).setDeliveryCount((short)
1)
+ );
+ List<RecordBatch> recordBatches3 = List.of(
+ memoryRecordsBuilder(1, 8).build().batches().iterator().next(),
+ memoryRecordsBuilder(1, 18).build().batches().iterator().next()
+ );
+
+ assertEquals(
+ List.of(
+ new
AcquiredRecords().setFirstOffset(0).setLastOffset(7).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(9).setLastOffset(17).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(19).setLastOffset(19).setDeliveryCount((short)
1)
+
+ ),
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords3,
recordBatches3)
+ );
+ }
+
+ @Test
+ public void testAcquireWithReadCommittedIsolationLevel() {
+ SharePartition sharePartition =
Mockito.spy(SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .build());
+
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 5, 10).close();
+ memoryRecordsBuilder(buffer, 5, 15).close();
+ memoryRecordsBuilder(buffer, 15, 20).close();
+ memoryRecordsBuilder(buffer, 8, 50).close();
+ memoryRecordsBuilder(buffer, 10, 58).close();
+ memoryRecordsBuilder(buffer, 5, 70).close();
+
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ FetchPartitionData fetchPartitionData = fetchPartitionData(records,
newAbortedTransactions());
+
+ // We are mocking the result of function
fetchAbortedTransactionRecordBatches. The records present at these offsets need
to be archived.
+ // We won't be utilizing the aborted transactions passed in
fetchPartitionData.
+
when(sharePartition.fetchAbortedTransactionRecordBatches(fetchPartitionData.records.batches(),
fetchPartitionData.abortedTransactions.get())).thenReturn(
+ List.of(
+ memoryRecordsBuilder(5,
10).build().batches().iterator().next(),
+ memoryRecordsBuilder(10,
58).build().batches().iterator().next(),
+ memoryRecordsBuilder(5, 70).build().batches().iterator().next()
+ )
+ );
+
+ List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(
+ sharePartition.acquire(
+ MEMBER_ID,
+ 10 /* Batch size */,
+ 100,
+ DEFAULT_FETCH_OFFSET,
+ fetchPartitionData,
+ FetchIsolation.TXN_COMMITTED),
+ 45 /* Gap of 15 records will be added to second batch, gap of 2
records will also be added to fourth batch */);
+
+ assertEquals(List.of(
+ new
AcquiredRecords().setFirstOffset(15).setLastOffset(19).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(20).setLastOffset(49).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(50).setLastOffset(57).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(68).setLastOffset(69).setDeliveryCount((short)
1)
+ ), acquiredRecordsList);
+ assertEquals(75, sharePartition.nextFetchOffset());
+
+ // Checking cached state.
+ assertEquals(4, sharePartition.cachedState().size());
+ assertTrue(sharePartition.cachedState().containsKey(10L));
+ assertTrue(sharePartition.cachedState().containsKey(20L));
+ assertTrue(sharePartition.cachedState().containsKey(50L));
+ assertTrue(sharePartition.cachedState().containsKey(70L));
+ assertNotNull(sharePartition.cachedState().get(10L).offsetState());
+ assertNotNull(sharePartition.cachedState().get(50L).offsetState());
+
+ assertEquals(19L, sharePartition.cachedState().get(10L).lastOffset());
+ assertEquals(49L, sharePartition.cachedState().get(20L).lastOffset());
+ assertEquals(69L, sharePartition.cachedState().get(50L).lastOffset());
+ assertEquals(74L, sharePartition.cachedState().get(70L).lastOffset());
+
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(20L).batchState());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(70L).batchState());
+
+ assertEquals(MEMBER_ID,
sharePartition.cachedState().get(20L).batchMemberId());
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(70L).batchMemberId());
+
+
assertNotNull(sharePartition.cachedState().get(20L).batchAcquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(70L).batchAcquisitionLockTimeoutTask());
+
+ Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
+ expectedOffsetStateMap.put(10L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(11L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(12L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(13L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(15L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(16L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(17L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(18L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
+
+
assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(10L).offsetState().get(11L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(10L).offsetState().get(12L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(10L).offsetState().get(13L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(10L).offsetState().get(14L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(15L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(16L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(17L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(18L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(19L).acquisitionLockTimeoutTask());
+
+ expectedOffsetStateMap = new HashMap<>();
+ expectedOffsetStateMap.put(50L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(51L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(52L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(53L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(54L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(55L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(56L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(57L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(58L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(59L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(60L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(61L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(62L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(63L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(64L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(65L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(66L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(67L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(68L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(69L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(50L).offsetState());
+
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(50L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(51L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(52L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(53L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(54L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(55L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(56L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(57L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(58L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(59L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(60L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(61L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(62L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(63L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(64L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(65L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(66L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(67L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(68L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(69L).acquisitionLockTimeoutTask());
+ }
+
+ @Test
+ public void testContainsAbortMarker() {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .build();
+
+ // Record batch is not a control batch.
+ RecordBatch recordBatch = mock(RecordBatch.class);
+ when(recordBatch.isControlBatch()).thenReturn(false);
+ assertFalse(sharePartition.containsAbortMarker(recordBatch));
+
+ // Record batch is a control batch but doesn't contain any records.
+ recordBatch = mock(RecordBatch.class);
+ Iterator batchIterator = mock(Iterator.class);
+ when(batchIterator.hasNext()).thenReturn(false);
+ when(recordBatch.iterator()).thenReturn(batchIterator);
+ when(recordBatch.isControlBatch()).thenReturn(true);
+ assertFalse(sharePartition.containsAbortMarker(recordBatch));
+
+ // Record batch is a control batch which contains a record of type
ControlRecordType.ABORT.
+ recordBatch = mock(RecordBatch.class);
+ batchIterator = mock(Iterator.class);
+ when(batchIterator.hasNext()).thenReturn(true);
+ DefaultRecord record = mock(DefaultRecord.class);
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ // Buffer has to be created in a way that
ControlRecordType.parse(buffer) returns ControlRecordType.ABORT.
+ buffer.putShort((short) 5);
+ buffer.putShort(ControlRecordType.ABORT.type());
+ buffer.putInt(23432); // some field added in version 5
+ buffer.flip();
+ when(record.key()).thenReturn(buffer);
+ when(batchIterator.next()).thenReturn(record);
+ when(recordBatch.iterator()).thenReturn(batchIterator);
+ when(recordBatch.isControlBatch()).thenReturn(true);
+ assertTrue(sharePartition.containsAbortMarker(recordBatch));
+
+ // Record batch is a control batch which contains a record of type
ControlRecordType.COMMIT.
+ recordBatch = mock(RecordBatch.class);
+ batchIterator = mock(Iterator.class);
+ when(batchIterator.hasNext()).thenReturn(true);
+ record = mock(DefaultRecord.class);
+ buffer = ByteBuffer.allocate(4096);
+ // Buffer has to be created in a way that
ControlRecordType.parse(buffer) returns ControlRecordType.COMMIT.
+ buffer.putShort((short) 5);
+ buffer.putShort(ControlRecordType.COMMIT.type());
+ buffer.putInt(23432); // some field added in version 5
+ buffer.flip();
+ when(record.key()).thenReturn(buffer);
+ when(batchIterator.next()).thenReturn(record);
+ when(recordBatch.iterator()).thenReturn(batchIterator);
+ when(recordBatch.isControlBatch()).thenReturn(true);
+ assertFalse(sharePartition.containsAbortMarker(recordBatch));
+ }
+
+ @Test
+ public void
testFetchAbortedTransactionRecordBatchesForOnlyAbortedTransactions() {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .build();
+
+ // Case 1 - Creating 10 transactional records in a single batch
followed by a ABORT marker record for producerId 1.
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ newTransactionalRecords(buffer, ControlRecordType.ABORT, 10, 1, 0);
+ buffer.flip();
+ Records records = MemoryRecords.readableRecords(buffer);
+
+ List<FetchResponseData.AbortedTransaction> abortedTransactions =
List.of(
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1)
+ );
+ // records from 0 to 9 should be archived because they are a part of
aborted transactions.
+ List<RecordBatch> actual =
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(),
abortedTransactions);
+ assertEquals(1, actual.size());
+ assertEquals(0, actual.get(0).baseOffset());
+ assertEquals(9, actual.get(0).lastOffset());
+ assertEquals(1, actual.get(0).producerId());
+
+ // Case 2: 3 individual batches each followed by a ABORT marker record
for producerId 1.
+ buffer = ByteBuffer.allocate(1024);
+ newTransactionalRecords(buffer, ControlRecordType.ABORT, 1, 1, 0);
+ newTransactionalRecords(buffer, ControlRecordType.ABORT, 1, 1, 2);
+ newTransactionalRecords(buffer, ControlRecordType.ABORT, 1, 1, 4);
+ buffer.flip();
+ records = MemoryRecords.readableRecords(buffer);
+ abortedTransactions = List.of(
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1),
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(2).setProducerId(1),
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(4).setProducerId(1)
+ );
+
+ actual =
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(),
abortedTransactions);
+ assertEquals(3, actual.size());
+ assertEquals(0, actual.get(0).baseOffset());
+ assertEquals(0, actual.get(0).lastOffset());
+ assertEquals(1, actual.get(0).producerId());
+ assertEquals(2, actual.get(1).baseOffset());
+ assertEquals(2, actual.get(1).lastOffset());
+ assertEquals(1, actual.get(1).producerId());
+ assertEquals(4, actual.get(2).baseOffset());
+ assertEquals(4, actual.get(2).lastOffset());
+ assertEquals(1, actual.get(2).producerId());
+
+ // Case 3: The producer id of records is different, so they should not
be archived,
+ buffer = ByteBuffer.allocate(1024);
+ // We are creating 10 transactional records followed by a ABORT marker
record for producerId 2.
+ newTransactionalRecords(buffer, ControlRecordType.ABORT, 10, 2, 0);
+ buffer.flip();
+ records = MemoryRecords.readableRecords(buffer);
+ abortedTransactions = List.of(
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1)
+ );
+
+ actual =
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(),
abortedTransactions);
+ assertEquals(0, actual.size());
+ }
+
+ @Test
+ public void
testFetchAbortedTransactionRecordBatchesForAbortedAndCommittedTransactions() {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .build();
+
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 1, 0);
+ newTransactionalRecords(buffer, ControlRecordType.COMMIT, 2, 2, 3);
+ newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 2, 6);
+ newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 1, 9);
+ newTransactionalRecords(buffer, ControlRecordType.COMMIT, 2, 1, 12);
+ newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 1, 15);
+ buffer.flip();
+ Records records = MemoryRecords.readableRecords(buffer);
+
+ // Case 1 - Aborted transactions does not contain the record batch
from offsets 6-7 with producer id 2.
+ List<FetchResponseData.AbortedTransaction> abortedTransactions =
List.of(
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1),
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(6).setProducerId(1),
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(9).setProducerId(1),
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(15).setProducerId(1)
+ );
+
+ List<RecordBatch> actual =
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(),
abortedTransactions);
+ assertEquals(3, actual.size());
+ assertEquals(0, actual.get(0).baseOffset());
+ assertEquals(1, actual.get(0).lastOffset());
+ assertEquals(1, actual.get(0).producerId());
+ assertEquals(9, actual.get(1).baseOffset());
+ assertEquals(10, actual.get(1).lastOffset());
+ assertEquals(1, actual.get(1).producerId());
+ assertEquals(15, actual.get(2).baseOffset());
+ assertEquals(16, actual.get(2).lastOffset());
+ assertEquals(1, actual.get(2).producerId());
+
+ // Case 2 - Aborted transactions contains the record batch from
offsets 6-7 with producer id 2.
+ abortedTransactions = List.of(
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1),
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(6).setProducerId(2),
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(9).setProducerId(1),
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(15).setProducerId(1)
+ );
+
+ actual =
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(),
abortedTransactions);
+ assertEquals(4, actual.size());
+ assertEquals(0, actual.get(0).baseOffset());
+ assertEquals(1, actual.get(0).lastOffset());
+ assertEquals(1, actual.get(0).producerId());
+ assertEquals(6, actual.get(1).baseOffset());
+ assertEquals(7, actual.get(1).lastOffset());
+ assertEquals(2, actual.get(1).producerId());
+ assertEquals(9, actual.get(2).baseOffset());
+ assertEquals(10, actual.get(2).lastOffset());
+ assertEquals(1, actual.get(2).producerId());
+ assertEquals(15, actual.get(3).baseOffset());
+ assertEquals(16, actual.get(3).lastOffset());
+ assertEquals(1, actual.get(3).producerId());
+ }
+
+ /**
+ * This function produces transactional data of a given no. of records
followed by a transactional marker (COMMIT/ABORT).
+ */
+ private void newTransactionalRecords(ByteBuffer buffer, ControlRecordType
controlRecordType, int numRecords, long producerId, long baseOffset) {
+ try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
+ RecordBatch.CURRENT_MAGIC_VALUE,
+ Compression.NONE,
+ TimestampType.CREATE_TIME,
+ baseOffset,
+ MOCK_TIME.milliseconds(),
+ producerId,
+ (short) 0,
+ 0,
+ true,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH)) {
+ for (int i = 0; i < numRecords; i++)
+ builder.append(new SimpleRecord(MOCK_TIME.milliseconds(),
"key".getBytes(), "value".getBytes()));
+
+ builder.build();
+ }
+ writeTransactionMarker(buffer, controlRecordType, (int) baseOffset +
numRecords, producerId);
+ }
+
+ private void writeTransactionMarker(ByteBuffer buffer, ControlRecordType
controlRecordType, int offset, long producerId) {
+ MemoryRecords.writeEndTransactionalMarker(buffer,
+ offset,
+ MOCK_TIME.milliseconds(),
+ 0,
+ producerId,
+ (short) 0,
+ new EndTransactionMarker(controlRecordType, 0));
+ }
+
+ private List<FetchResponseData.AbortedTransaction>
newAbortedTransactions() {
+ FetchResponseData.AbortedTransaction abortedTransaction = new
FetchResponseData.AbortedTransaction();
+ abortedTransaction.setFirstOffset(0);
+ abortedTransaction.setProducerId(1000L);
+ return List.of(abortedTransaction);
+ }
+
private FetchPartitionData fetchPartitionData(Records records) {
return fetchPartitionData(records, 0);
}
+ private FetchPartitionData fetchPartitionData(Records records,
List<FetchResponseData.AbortedTransaction> abortedTransactions) {
+ return fetchPartitionData(records, 0, abortedTransactions);
+ }
+
private FetchPartitionData fetchPartitionData(Records records, long
logStartOffset) {
return new FetchPartitionData(Errors.NONE, 5, logStartOffset, records,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false);
}
+ private FetchPartitionData fetchPartitionData(Records records, long
logStartOffset, List<FetchResponseData.AbortedTransaction> abortedTransactions)
{
+ return new FetchPartitionData(Errors.NONE, 5, logStartOffset, records,
+ Optional.empty(), OptionalLong.empty(),
Optional.of(abortedTransactions), OptionalInt.empty(), false);
+ }
+
private List<AcquiredRecords> fetchAcquiredRecords(SharePartition
sharePartition, Records records, long logStartOffset, int expectedOffsetCount) {
return fetchAcquiredRecords(sharePartition, records,
records.batches().iterator().next().baseOffset(), logStartOffset,
expectedOffsetCount);
}
@@ -6688,7 +7136,8 @@ public class SharePartitionTest {
BATCH_SIZE,
MAX_FETCH_RECORDS,
fetchOffset,
- fetchPartitionData(records, logStartOffset));
+ fetchPartitionData(records, logStartOffset),
+ FETCH_ISOLATION_HWM);
return fetchAcquiredRecords(shareAcquiredRecords, expectedOffsetCount);
}
@@ -6698,7 +7147,8 @@ public class SharePartitionTest {
BATCH_SIZE,
MAX_FETCH_RECORDS,
records.batches().iterator().next().baseOffset(),
- fetchPartitionData(records));
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM);
return fetchAcquiredRecords(shareAcquiredRecords, expectedOffsetCount);
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index e933b1985c7..b4768a90317 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -75,7 +75,7 @@ import org.apache.kafka.common.resource.{PatternType,
Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection,
ProducerIdAndEpoch, SecurityUtils, Utils}
-import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
STREAMS_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_SESSION_TIMEOUT_MS_CONFIG}
+import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG,
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG,
STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
STREAMS_SESSION_TIMEOUT_MS_CONFIG}
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager,
GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -340,6 +340,7 @@ class KafkaApisTest extends Logging {
cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG,
GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT)
+ cgConfigs.put(SHARE_ISOLATION_LEVEL_CONFIG,
GroupConfig.SHARE_ISOLATION_LEVEL_DEFAULT)
cgConfigs.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
cgConfigs.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 6dd859e1354..c7569fdf47a 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -17,12 +17,14 @@
package org.apache.kafka.coordinator.group;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -32,6 +34,7 @@ import static
org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
/**
* Group configuration related parameters and supporting methods like
validation, etc. are
@@ -59,6 +62,13 @@ public final class GroupConfig extends AbstractConfig {
"Negative duration is not allowed.</li>" +
"<li>anything else: throw exception to the share consumer.</li></ul>";
+ public static final String SHARE_ISOLATION_LEVEL_CONFIG =
"share.isolation.level";
+ public static final String SHARE_ISOLATION_LEVEL_DEFAULT =
IsolationLevel.READ_UNCOMMITTED.toString();
+ public static final String SHARE_ISOLATION_LEVEL_DOC = "Controls how to
read records written transactionally. " +
+ "If set to \"read_committed\", the share group will only deliver
transactional records which have been committed. " +
+ "If set to \"read_uncommitted\", the share group will return all
messages, even transactional messages which have been aborted. " +
+ "Non-transactional records will be returned unconditionally in either
mode.";
+
public static final String STREAMS_SESSION_TIMEOUT_MS_CONFIG =
"streams.session.timeout.ms";
public static final String STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG =
"streams.heartbeat.interval.ms";
@@ -83,6 +93,8 @@ public final class GroupConfig extends AbstractConfig {
public final int streamsNumStandbyReplicas;
+ public final String shareIsolationLevel;
+
private static final ConfigDef CONFIG = new ConfigDef()
.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
INT,
@@ -120,6 +132,12 @@ public final class GroupConfig extends AbstractConfig {
new ShareGroupAutoOffsetResetStrategy.Validator(),
MEDIUM,
SHARE_AUTO_OFFSET_RESET_DOC)
+ .define(SHARE_ISOLATION_LEVEL_CONFIG,
+ STRING,
+ SHARE_ISOLATION_LEVEL_DEFAULT,
+ in(IsolationLevel.READ_COMMITTED.toString(),
IsolationLevel.READ_UNCOMMITTED.toString()),
+ MEDIUM,
+ SHARE_ISOLATION_LEVEL_DOC)
.define(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
INT,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT,
@@ -150,6 +168,7 @@ public final class GroupConfig extends AbstractConfig {
this.streamsSessionTimeoutMs =
getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
this.streamsHeartbeatIntervalMs =
getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
this.streamsNumStandbyReplicas =
getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
+ this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG);
}
public static ConfigDef configDef() {
@@ -290,6 +309,13 @@ public final class GroupConfig extends AbstractConfig {
return
ShareGroupAutoOffsetResetStrategy.fromString(SHARE_AUTO_OFFSET_RESET_DEFAULT);
}
+ /**
+ * The default share group isolation level.
+ */
+ public static IsolationLevel defaultShareIsolationLevel() {
+ return
IsolationLevel.valueOf(SHARE_ISOLATION_LEVEL_DEFAULT.toUpperCase(Locale.ROOT));
+ }
+
/**
* The consumer group session timeout in milliseconds.
*/
@@ -352,4 +378,18 @@ public final class GroupConfig extends AbstractConfig {
public int streamsNumStandbyReplicas() {
return streamsNumStandbyReplicas;
}
+
+ /**
+ * The share group isolation level.
+ */
+ public IsolationLevel shareIsolationLevel() {
+ if (shareIsolationLevel == null) {
+ throw new IllegalArgumentException("Share isolation level is
null");
+ }
+ try {
+ return
IsolationLevel.valueOf(shareIsolationLevel.toUpperCase(Locale.ROOT));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Unknown Share isolation level:
" + shareIsolationLevel);
+ }
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 4e77eb15125..50205b0a9e9 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -61,6 +61,8 @@ public class GroupConfigTest {
assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
} else if
(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG.equals(name)) {
assertPropertyInvalid(name, "hello", "1.0");
+ } else if (GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG.equals(name)) {
+ assertPropertyInvalid(name, "hello", "1.0");
} else if
(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "1.0");
} else if
(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG.equals(name)) {
@@ -100,6 +102,19 @@ public class GroupConfigTest {
doTestValidProps(props);
}
+ @Test
+ public void testValidShareIsolationLevelValues() {
+ // Check for value READ_UNCOMMITTED
+ Properties props = createValidGroupConfig();
+ props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_committed");
+ doTestValidProps(props);
+
+ // Check for value READ_COMMITTED
+ props = createValidGroupConfig();
+ props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG,
"read_uncommitted");
+ doTestValidProps(props);
+ }
+
@Test
public void testInvalidProps() {
@@ -190,6 +205,16 @@ public class GroupConfigTest {
// Check for invalid streamsHeartbeatIntervalMs, > MAX
props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "70000");
doTestInvalidProps(props, InvalidConfigurationException.class);
+ props = createValidGroupConfig();
+
+ // Check for invalid shareIsolationLevel.
+ props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_commit");
+ doTestInvalidProps(props, ConfigException.class);
+ props = createValidGroupConfig();
+
+ // Check for invalid shareIsolationLevel.
+ props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_uncommit");
+ doTestInvalidProps(props, ConfigException.class);
}
private void doTestInvalidProps(Properties props, Class<? extends
Exception> exceptionClassName) {
@@ -209,6 +234,7 @@ public class GroupConfigTest {
defaultValue.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "10");
defaultValue.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
"2000");
defaultValue.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
+ defaultValue.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG,
"read_uncommitted");
defaultValue.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
"10");
defaultValue.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG,
"2000");
defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
@@ -223,6 +249,7 @@ public class GroupConfigTest {
assertEquals(10,
config.getInt(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG));
assertEquals(2000,
config.getInt(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG));
assertEquals("latest",
config.getString(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG));
+ assertEquals("read_uncommitted",
config.getString(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG));
assertEquals(10,
config.getInt(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG));
assertEquals(2000,
config.getInt(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG));
assertEquals(1,
config.getInt(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG));
@@ -244,6 +271,7 @@ public class GroupConfigTest {
props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "30000");
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
+ props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG,
"read_uncommitted");
props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "50000");
props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");