This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9f2ac375c28 KAFKA-15410: Reassign replica expand, move and shrink
integration tests (2/4) (#14328)
9f2ac375c28 is described below
commit 9f2ac375c282e1471a2d385704e1f7c128f34bb6
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Tue Sep 5 19:28:17 2023 +0530
KAFKA-15410: Reassign replica expand, move and shrink integration tests
(2/4) (#14328)
- Updated the log-start-offset to the correct value while building the
replica state in ReplicaFetcherTierStateMachine#buildRemoteLogAuxState
Integration tests added:
1. ReassignReplicaExpandTest
2. ReassignReplicaMoveTest and
3. ReassignReplicaShrinkTest
Reviewers: Satish Duggana <[email protected]>, Luke Chen
<[email protected]>
---
.../server/ReplicaFetcherTierStateMachine.java | 2 +-
core/src/main/scala/kafka/cluster/Partition.scala | 7 +-
core/src/main/scala/kafka/log/LogManager.scala | 8 +-
core/src/main/scala/kafka/log/UnifiedLog.scala | 8 +-
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 48 +++++++++-
.../integration/BaseReassignReplicaTest.java | 100 +++++++++++++++++++++
.../storage/integration/PartitionsExpandTest.java | 3 +-
.../integration/ReassignReplicaExpandTest.java | 32 +++++++
.../integration/ReassignReplicaMoveTest.java | 32 +++++++
...andTest.java => ReassignReplicaShrinkTest.java} | 96 ++++++++++----------
10 files changed, 275 insertions(+), 61 deletions(-)
diff --git
a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java
b/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java
index 3f5a1b2c69f..74fc7caaea2 100644
--- a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java
+++ b/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java
@@ -227,7 +227,7 @@ public class ReplicaFetcherTierStateMachine implements
TierStateMachine {
// Truncate the existing local log before restoring the leader
epoch cache and producer snapshots.
Partition partition =
replicaMgr.getPartitionOrException(topicPartition);
- partition.truncateFullyAndStartAt(nextOffset, false);
+ partition.truncateFullyAndStartAt(nextOffset, false,
Option.apply(leaderLogStartOffset));
// Build leader epoch cache.
unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset,
LeaderOffsetIncremented);
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 280ab457614..62cc3f4ee3a 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -1646,12 +1646,15 @@ class Partition(val topicPartition: TopicPartition,
*
* @param newOffset The new offset to start the log with
* @param isFuture True iff the truncation should be performed on the
future log of this partition
+ * @param logStartOffsetOpt The log start offset to set for the log. If
None, the new offset will be used.
*/
- def truncateFullyAndStartAt(newOffset: Long, isFuture: Boolean): Unit = {
+ def truncateFullyAndStartAt(newOffset: Long,
+ isFuture: Boolean,
+ logStartOffsetOpt: Option[Long] = None): Unit = {
// The read lock is needed to prevent the follower replica from being
truncated while ReplicaAlterDirThread
// is executing maybeReplaceCurrentWithFutureReplica() to replace follower
replica with the future replica.
inReadLock(leaderIsrUpdateLock) {
- logManager.truncateFullyAndStartAt(topicPartition, newOffset, isFuture =
isFuture)
+ logManager.truncateFullyAndStartAt(topicPartition, newOffset, isFuture =
isFuture, logStartOffsetOpt)
}
}
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index 579d3aba424..54235ae4c0c 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -705,8 +705,12 @@ class LogManager(logDirs: Seq[File],
* @param topicPartition The partition whose log needs to be truncated
* @param newOffset The new offset to start the log with
* @param isFuture True iff the truncation should be performed on the future
log of the specified partition
+ * @param logStartOffsetOpt The log start offset to set for the log. If
None, the new offset will be used.
*/
- def truncateFullyAndStartAt(topicPartition: TopicPartition, newOffset: Long,
isFuture: Boolean): Unit = {
+ def truncateFullyAndStartAt(topicPartition: TopicPartition,
+ newOffset: Long,
+ isFuture: Boolean,
+ logStartOffsetOpt: Option[Long] = None): Unit = {
val log = {
if (isFuture)
futureLogs.get(topicPartition)
@@ -719,7 +723,7 @@ class LogManager(logDirs: Seq[File],
if (!isFuture)
abortAndPauseCleaning(topicPartition)
try {
- log.truncateFullyAndStartAt(newOffset)
+ log.truncateFullyAndStartAt(newOffset, logStartOffsetOpt)
if (!isFuture)
maybeTruncateCleanerCheckpointToActiveSegmentBaseOffset(log,
topicPartition)
} finally {
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index c289cf25fef..f5a01c15637 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -1745,15 +1745,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* Delete all data in the log and start at the new offset
*
* @param newOffset The new offset to start the log with
+ * @param logStartOffsetOpt The log start offset to set for the log. If
None, the new offset will be used.
*/
- def truncateFullyAndStartAt(newOffset: Long): Unit = {
+ def truncateFullyAndStartAt(newOffset: Long,
+ logStartOffsetOpt: Option[Long] = None): Unit = {
maybeHandleIOException(s"Error while truncating the entire log for
$topicPartition in dir ${dir.getParent}") {
- debug(s"Truncate and start at offset $newOffset")
+ debug(s"Truncate and start at offset $newOffset, logStartOffset:
${logStartOffsetOpt.getOrElse(newOffset)}")
lock synchronized {
localLog.truncateFullyAndStartAt(newOffset)
leaderEpochCache.foreach(_.clearAndFlush())
producerStateManager.truncateFullyAndStartAt(newOffset)
- logStartOffset = newOffset
+ logStartOffset = logStartOffsetOpt.getOrElse(newOffset)
rebuildProducerState(newOffset, producerStateManager)
updateHighWatermark(localLog.logEndOffsetMetadata)
}
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index c6806479a18..90d911e0adf 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -171,15 +171,55 @@ class UnifiedLogTest {
@Test
def testTruncateBelowFirstUnstableOffset(): Unit = {
- testTruncateBelowFirstUnstableOffset(_.truncateTo)
+ testTruncateBelowFirstUnstableOffset((log, targetOffset) =>
log.truncateTo(targetOffset))
}
@Test
def testTruncateFullyAndStartBelowFirstUnstableOffset(): Unit = {
- testTruncateBelowFirstUnstableOffset(_.truncateFullyAndStartAt)
+ testTruncateBelowFirstUnstableOffset((log, targetOffset) =>
log.truncateFullyAndStartAt(targetOffset))
}
- private def testTruncateBelowFirstUnstableOffset(truncateFunc: UnifiedLog =>
(Long => Unit)): Unit = {
+ @Test
+ def testTruncateFullyAndStart(): Unit = {
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
+ val log = createLog(logDir, logConfig)
+
+ val producerId = 17L
+ val producerEpoch: Short = 10
+ val sequence = 0
+
+ log.appendAsLeader(TestUtils.records(List(
+ new SimpleRecord("0".getBytes),
+ new SimpleRecord("1".getBytes),
+ new SimpleRecord("2".getBytes)
+ )), leaderEpoch = 0)
+
+ log.appendAsLeader(MemoryRecords.withTransactionalRecords(
+ CompressionType.NONE,
+ producerId,
+ producerEpoch,
+ sequence,
+ new SimpleRecord("3".getBytes),
+ new SimpleRecord("4".getBytes)
+ ), leaderEpoch = 0)
+
+ assertEquals(Some(3L), log.firstUnstableOffset)
+
+ // We close and reopen the log to ensure that the first unstable offset
segment
+ // position will be undefined when we truncate the log.
+ log.close()
+
+ val reopened = createLog(logDir, logConfig)
+ assertEquals(Optional.of(new LogOffsetMetadata(3L)),
reopened.producerStateManager.firstUnstableOffset)
+
+ reopened.truncateFullyAndStartAt(2L, Some(1L))
+ assertEquals(None, reopened.firstUnstableOffset)
+ assertEquals(java.util.Collections.emptyMap(),
reopened.producerStateManager.activeProducers)
+ assertEquals(1L, reopened.logStartOffset)
+ assertEquals(2L, reopened.logEndOffset)
+ }
+
+ private def testTruncateBelowFirstUnstableOffset(truncateFunc: (UnifiedLog,
Long) => Unit): Unit = {
// Verify that truncation below the first unstable offset correctly
// resets the producer state. Specifically we are testing the case when
// the segment position of the first unstable offset is unknown.
@@ -215,7 +255,7 @@ class UnifiedLogTest {
val reopened = createLog(logDir, logConfig)
assertEquals(Optional.of(new LogOffsetMetadata(3L)),
reopened.producerStateManager.firstUnstableOffset)
- truncateFunc(reopened)(0L)
+ truncateFunc(reopened, 0L)
assertEquals(None, reopened.firstUnstableOffset)
assertEquals(java.util.Collections.emptyMap(),
reopened.producerStateManager.activeProducers)
}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java
new file mode 100644
index 00000000000..afac6513036
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public abstract class BaseReassignReplicaTest extends TieredStorageTestHarness
{
+ protected final Integer broker0 = 0;
+ protected final Integer broker1 = 1;
+
+ /**
+ * Cluster of two brokers
+ * @return number of brokers in the cluster
+ */
+ @Override
+ public int brokerCount() {
+ return 2;
+ }
+
+ /**
+ * Number of partitions in the '__remote_log_metadata' topic
+ * @return number of partitions in the '__remote_log_metadata' topic
+ */
+ @Override
+ public int numRemoteLogMetadataPartitions() {
+ return 2;
+ }
+
+ @Override
+ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ final String topicA = "topicA";
+ final String topicB = "topicB";
+ final Integer p0 = 0;
+ final Integer partitionCount = 5;
+ final Integer replicationFactor = 2;
+ final Integer maxBatchCountPerSegment = 1;
+ final Map<Integer, List<Integer>> replicaAssignment = null;
+ final boolean enableRemoteLogStorage = true;
+ final List<Integer> metadataPartitions = new ArrayList<>();
+ for (int i = 0; i < numRemoteLogMetadataPartitions(); i++) {
+ metadataPartitions.add(i);
+ }
+
+ builder
+ // create topicA with 5 partitions, 2 RF and ensure that the
user-topic-partitions are mapped to
+ // metadata partitions
+ .createTopic(topicA, partitionCount, replicationFactor,
maxBatchCountPerSegment,
+ replicaAssignment, enableRemoteLogStorage)
+ .expectUserTopicMappedToMetadataPartitions(topicA,
metadataPartitions)
+ // create topicB with 1 partition and 1 RF
+ .createTopic(topicB, 1, 1, maxBatchCountPerSegment,
+ mkMap(mkEntry(p0,
Collections.singletonList(broker0))), enableRemoteLogStorage)
+ // send records to partition 0
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new
KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new
KeyValueSpec("k1", "v1"))
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+ .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new
KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+ // The newly created replica gets mapped to one of the
metadata partition which is being actively
+ // consumed by both the brokers
+ .reassignReplica(topicB, p0, replicaIds())
+ .expectLeader(topicB, p0, broker1, true)
+ // produce some more events and verify the earliest local
offset
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+ .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+ // consume from the beginning of the topic to read data from
local and remote storage
+ .expectFetchFromTieredStorage(broker1, topicB, p0, 3)
+ .consume(topicB, p0, 0L, 4, 3);
+ }
+
+ /**
+ * Replicas of the topic
+ * @return the replica-ids of the topic
+ */
+ protected abstract List<Integer> replicaIds();
+}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
index e3453bdc5ee..fb32194191c 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
@@ -83,7 +83,8 @@ public final class PartitionsExpandTest extends
TieredStorageTestHarness {
new KeyValueSpec("k2", "v2"))
// produce some more events to partition 0 and expect the
segments to be offloaded
- // NOTE: Support needs to be added to capture the offloaded
segment event for already sent message (k2, v2)
+ // KAFKA-15431: Support needs to be added to capture the
offloaded segment event for already sent
+ // message (k2, v2)
// .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new
KeyValueSpec("k2", "v2"))
.expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new
KeyValueSpec("k3", "v3"))
.expectSegmentToBeOffloaded(broker0, topicA, p0, 4, new
KeyValueSpec("k4", "v4"))
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaExpandTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaExpandTest.java
new file mode 100644
index 00000000000..22b3b3da463
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaExpandTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import java.util.Arrays;
+import java.util.List;
+
+public final class ReassignReplicaExpandTest extends BaseReassignReplicaTest {
+
+ /**
+ * Expand the replication factor of the topic by changing the replica list
from 0 to 0, 1
+ * @return the replica-ids of the topic
+ */
+ @Override
+ protected List<Integer> replicaIds() {
+ return Arrays.asList(broker0, broker1);
+ }
+}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaMoveTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaMoveTest.java
new file mode 100644
index 00000000000..94cde9f503f
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaMoveTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import java.util.Collections;
+import java.util.List;
+
+public final class ReassignReplicaMoveTest extends BaseReassignReplicaTest {
+
+ /**
+ * Move the replica of the topic from broker0 to broker1
+ * @return the replica-ids of the topic
+ */
+ @Override
+ protected List<Integer> replicaIds() {
+ return Collections.singletonList(broker1);
+ }
+}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaShrinkTest.java
similarity index 54%
copy from
storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
copy to
storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaShrinkTest.java
index e3453bdc5ee..f19a545ba14 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaShrinkTest.java
@@ -23,17 +23,31 @@ import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
-public final class PartitionsExpandTest extends TieredStorageTestHarness {
+public final class ReassignReplicaShrinkTest extends TieredStorageTestHarness {
+ /**
+ * Cluster of two brokers
+ * @return number of brokers in the cluster
+ */
@Override
public int brokerCount() {
return 2;
}
+ /**
+ * Number of partitions in the '__remote_log_metadata' topic
+ * @return number of partitions in the '__remote_log_metadata' topic
+ */
+ @Override
+ public int numRemoteLogMetadataPartitions() {
+ return 2;
+ }
+
@Override
protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
final Integer broker0 = 0;
@@ -41,66 +55,52 @@ public final class PartitionsExpandTest extends
TieredStorageTestHarness {
final String topicA = "topicA";
final Integer p0 = 0;
final Integer p1 = 1;
- final Integer p2 = 2;
- final Integer partitionCount = 1;
+ final Integer partitionCount = 2;
final Integer replicationFactor = 2;
final Integer maxBatchCountPerSegment = 1;
final boolean enableRemoteLogStorage = true;
- final List<Integer> p0Assignment = Arrays.asList(broker0, broker1);
- final List<Integer> p1Assignment = Arrays.asList(broker0, broker1);
- final List<Integer> p2Assignment = Arrays.asList(broker1, broker0);
+ final Map<Integer, List<Integer>> replicaAssignment = mkMap(
+ mkEntry(p0, Arrays.asList(broker0, broker1)),
+ mkEntry(p1, Arrays.asList(broker1, broker0))
+ );
builder
+ // create topicA with 2 partitions and 2 RF
.createTopic(topicA, partitionCount, replicationFactor,
maxBatchCountPerSegment,
- Collections.singletonMap(p0, p0Assignment),
enableRemoteLogStorage)
- // produce events to partition 0
+ replicaAssignment, enableRemoteLogStorage)
+ // send records to partition 0, expect that the segments are
uploaded and removed from local log dir
.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new
KeyValueSpec("k0", "v0"))
.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new
KeyValueSpec("k1", "v1"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new
KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"))
- // expand the topicA partition-count to 3
- .createPartitions(topicA, 3, mkMap(mkEntry(p1, p1Assignment),
mkEntry(p2, p2Assignment)))
- // consume from the beginning of the topic to read data from
local and remote storage for partition 0
- .expectFetchFromTieredStorage(broker0, topicA, p0, 2)
- .consume(topicA, p0, 0L, 3, 2)
-
- .expectLeader(topicA, p1, broker0, false)
- .expectLeader(topicA, p2, broker1, false)
-
- // produce events to partition 1
- .expectSegmentToBeOffloaded(broker0, topicA, p1, 0, new
KeyValueSpec("k0", "v0"))
- .expectSegmentToBeOffloaded(broker0, topicA, p1, 1, new
KeyValueSpec("k1", "v1"))
+ // send records to partition 1, expect that the segments are
uploaded and removed from local log dir
+ .expectSegmentToBeOffloaded(broker1, topicA, p1, 0, new
KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker1, topicA, p1, 1, new
KeyValueSpec("k1", "v1"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p1, 2L)
.produce(topicA, p1, new KeyValueSpec("k0", "v0"), new
KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"))
-
- // produce events to partition 2
- .expectSegmentToBeOffloaded(broker1, topicA, p2, 0, new
KeyValueSpec("k0", "v0"))
- .expectSegmentToBeOffloaded(broker1, topicA, p2, 1, new
KeyValueSpec("k1", "v1"))
- .expectEarliestLocalOffsetInLogDirectory(topicA, p2, 2L)
- .produce(topicA, p2, new KeyValueSpec("k0", "v0"), new
KeyValueSpec("k1", "v1"),
- new KeyValueSpec("k2", "v2"))
-
- // produce some more events to partition 0 and expect the
segments to be offloaded
- // NOTE: Support needs to be added to capture the offloaded
segment event for already sent message (k2, v2)
- // .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new
KeyValueSpec("k2", "v2"))
- .expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new
KeyValueSpec("k3", "v3"))
- .expectSegmentToBeOffloaded(broker0, topicA, p0, 4, new
KeyValueSpec("k4", "v4"))
- .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 5L)
- .produce(topicA, p0, new KeyValueSpec("k3", "v3"), new
KeyValueSpec("k4", "v4"),
- new KeyValueSpec("k5", "v5"))
-
- // consume from the beginning of the topic to read data from
local and remote storage for partition 0
- .expectFetchFromTieredStorage(broker0, topicA, p0, 5)
- .consume(topicA, p0, 0L, 6, 5)
-
- // consume from the beginning of the topic to read data from
local and remote storage for partition 1
- .expectFetchFromTieredStorage(broker0, topicA, p1, 2)
- .consume(topicA, p1, 0L, 3, 2)
-
- // consume from the middle of the topic for partition 2
- .expectFetchFromTieredStorage(broker1, topicA, p2, 1)
- .consume(topicA, p2, 1L, 2, 1);
+ // shrink the replication factor to 1
+ .shrinkReplica(topicA, p0, Collections.singletonList(broker1))
+ .shrinkReplica(topicA, p1, Collections.singletonList(broker0))
+ .expectLeader(topicA, p0, broker1, false)
+ .expectLeader(topicA, p1, broker0, false)
+ // produce some more events to partition 0
+ // KAFKA-15431: Support needs to be added to capture the
offloaded segment event for already sent
+ // message (k2, v2)
+ // .expectSegmentToBeOffloaded(broker1, topicA, p0, 2, new
KeyValueSpec("k2", "v2"))
+ .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
+ .produce(topicA, p0, new KeyValueSpec("k3", "v3"))
+ // produce some more events to partition 1
+ // KAFKA-15431: Support needs to be added to capture the
offloaded segment event for already sent
+ // message (k2, v2)
+ // .expectSegmentToBeOffloaded(broker0, topicA, p1, 2, new
KeyValueSpec("k2", "v2"))
+ .expectEarliestLocalOffsetInLogDirectory(topicA, p1, 3L)
+ .produce(topicA, p1, new KeyValueSpec("k3", "v3"))
+ // consume from the beginning of the topic to read data from
local and remote storage
+ .expectFetchFromTieredStorage(broker1, topicA, p0, 3)
+ .consume(topicA, p0, 0L, 4, 3)
+ .expectFetchFromTieredStorage(broker0, topicA, p1, 3)
+ .consume(topicA, p1, 0L, 4, 3);
}
}