This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f1a664f5d68 KAFKA-19858 Set default min.insync.replicas=2 for
__remote_log_metadata topic to prevent data loss (KIP-1235) (#20811)
f1a664f5d68 is described below
commit f1a664f5d68762334d863f717cf7a95bcf0135a3
Author: Jian <[email protected]>
AuthorDate: Mon Dec 29 11:12:25 2025 +0800
KAFKA-19858 Set default min.insync.replicas=2 for __remote_log_metadata
topic to prevent data loss (KIP-1235) (#20811)
[KIP-1235](https://cwiki.apache.org/confluence/x/yommFw)
The __remote_log_metadata internal topic currently lacks a configurable
min.insync.replicas setting, relying on the broker-level default
(typically 1) when the factor is 3 as default. This creates a data loss
risk in production environments, as writes may be acknowledged by only a
single replica.
What's more, this is inconsistent with some other cases such as
__transaction_state, which explicitly sets min.insync.replicas=2 via the
transaction.state.log.min.isr broker configuration. Both topics store
critical metadata and should have equivalent durability guarantees.
Note:
```
public static final String
REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP =
"rlmm.config.remote.log.metadata.topic.replication.factor";
public static final short
DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR = 3;
```
Reviewers: Kamal Chandraprakash <[email protected]>,
Chia-Ping Tsai <[email protected]>
---
docs/getting-started/upgrade.md | 2 +-
docs/operations/tiered-storage.md | 6 ++-
.../TopicBasedRemoteLogMetadataManager.java | 1 +
.../TopicBasedRemoteLogMetadataManagerConfig.java | 12 +++++
.../storage/RemoteLogMetadataManagerTestUtils.java | 2 +-
...picBasedRemoteLogMetadataManagerConfigTest.java | 23 +++++++++
.../TopicBasedRemoteLogMetadataManagerTest.java | 56 ++++++++++++++++++++++
7 files changed, 99 insertions(+), 3 deletions(-)
diff --git a/docs/getting-started/upgrade.md b/docs/getting-started/upgrade.md
index 1adf5110659..3ae4c68ce6f 100644
--- a/docs/getting-started/upgrade.md
+++ b/docs/getting-started/upgrade.md
@@ -33,7 +33,7 @@ type: docs
### Notable changes in 4.3.0
* Two new configs have been introduced:
`group.coordinator.cached.buffer.max.bytes` and
`share.coordinator.cached.buffer.max.bytes`. They allow the respective
coordinators to set the maximum buffer size retained for reuse. For further
details, please refer to
[KIP-1196](https://cwiki.apache.org/confluence/x/hA5JFg).
-
+ * The new config have been introduced: `remote.log.metadata.topic.min.isr`
with 2 as default value. You can correct the min.insync.replicas for the
existed __remote_log_metadata topic via kafka-configs.sh if needed. For further
details, please refer to
[KIP-1235](https://cwiki.apache.org/confluence/x/yommFw).
## Upgrading to 4.2.0
diff --git a/docs/operations/tiered-storage.md
b/docs/operations/tiered-storage.md
index ca6376be566..5da14595279 100644
--- a/docs/operations/tiered-storage.md
+++ b/docs/operations/tiered-storage.md
@@ -89,9 +89,13 @@ After build successfully, there should be a
`kafka-storage-x.x.x-test.jar` file
# Note, please make sure the brokers need to have access to this directory
rsm.config.dir=/tmp/kafka-remote-storage
- # This needs to be changed if number of brokers in the cluster is more
than 1
+ # For single broker cluster, set this to 1. Default is 3 for clusters with
3 or more brokers.
rlmm.config.remote.log.metadata.topic.replication.factor=1
+ # The minimum number of replicas that must acknowledge a write to remote
log metadata topic.
+ # Default value is 2. For single broker cluster (replication factor = 1),
set this to 1.
+ rlmm.config.remote.log.metadata.topic.min.isr=1
+
# Try to speed up the log retention check interval for testing
log.retention.check.interval.ms=1000
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index 1b2b459cb8d..cd2ed674216 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -434,6 +434,7 @@ public class TopicBasedRemoteLogMetadataManager implements
BrokerReadyCallback,
topicConfigs.put(TopicConfig.RETENTION_MS_CONFIG,
Long.toString(rlmmConfig.metadataTopicRetentionMs()));
topicConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_DELETE);
topicConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"false");
+ topicConfigs.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
Short.toString(rlmmConfig.metadataTopicMinIsr()));
return new NewTopic(rlmmConfig.remoteLogMetadataTopicName(),
rlmmConfig.metadataTopicPartitionsCount(),
rlmmConfig.metadataTopicReplicationFactor()).configs(topicConfigs);
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
index 721d58e34d8..52f934ce299 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
@@ -46,6 +46,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
public static final String
REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP =
"remote.log.metadata.topic.replication.factor";
public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP =
"remote.log.metadata.topic.num.partitions";
public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP =
"remote.log.metadata.topic.retention.ms";
+ public static final String REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP =
"remote.log.metadata.topic.min.isr";
public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP =
"remote.log.metadata.consume.wait.ms";
public static final String
REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP =
"remote.log.metadata.initialization.retry.max.timeout.ms";
public static final String
REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP =
"remote.log.metadata.initialization.retry.interval.ms";
@@ -53,6 +54,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
public static final int DEFAULT_REMOTE_LOG_METADATA_TOPIC_PARTITIONS = 50;
public static final long DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MS =
-1L;
public static final short
DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR = 3;
+ public static final short DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR = 2;
public static final long DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS = 2 *
60 * 1000L;
public static final long
DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS = 2 * 60 *
1000L;
public static final long
DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS = 100L;
@@ -63,6 +65,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
"Default: -1, that means unlimited. Users can configure this value
based on their use cases. " +
"To avoid any data loss, this value should be more than the
maximum retention period of any topic enabled with " +
"tiered storage in the cluster.";
+ public static final String REMOTE_LOG_METADATA_TOPIC_MIN_ISR_DOC = "The
minimum number of replicas that must acknowledge a write to remote log metadata
topic.";
public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC = "The
amount of time in milliseconds to wait for the local consumer to " +
"receive the published event.";
public static final String
REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_DOC = "The retry interval
in milliseconds for " +
@@ -90,6 +93,8 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
REMOTE_LOG_METADATA_TOPIC_PARTITIONS_DOC)
.define(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, LONG,
DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MS, LOW,
REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_DOC)
+ .define(REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP, SHORT,
DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR, atLeast(1), LOW,
+ REMOTE_LOG_METADATA_TOPIC_MIN_ISR_DOC)
.define(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP, LONG,
DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS, atLeast(0), LOW,
REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC)
.define(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP, LONG,
@@ -106,6 +111,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig
{
private final long consumeWaitMs;
private final long metadataTopicRetentionMs;
private final short metadataTopicReplicationFactor;
+ private final short metadataTopicMinIsr;
private final long initializationRetryMaxTimeoutMs;
private final long initializationRetryIntervalMs;
@@ -126,6 +132,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig
{
if (metadataTopicRetentionMs != -1 && metadataTopicRetentionMs <= 0) {
throw new IllegalArgumentException("Invalid metadata topic
retention in millis: " + metadataTopicRetentionMs);
}
+ metadataTopicMinIsr = (short)
parsedConfigs.get(REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP);
consumeWaitMs = (long)
parsedConfigs.get(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP);
initializationRetryIntervalMs = (long)
parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP);
initializationRetryMaxTimeoutMs = (long)
parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP);
@@ -184,6 +191,10 @@ public final class
TopicBasedRemoteLogMetadataManagerConfig {
return initializationRetryIntervalMs;
}
+ public short metadataTopicMinIsr() {
+ return metadataTopicMinIsr;
+ }
+
public String logDir() {
return logDir;
}
@@ -229,6 +240,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig
{
", consumeWaitMs=" + consumeWaitMs +
", metadataTopicRetentionMs=" + metadataTopicRetentionMs +
", metadataTopicReplicationFactor=" +
metadataTopicReplicationFactor +
+ ", metadataTopicMinIsr=" + metadataTopicMinIsr +
", initializationRetryMaxTimeoutMs=" +
initializationRetryMaxTimeoutMs +
", initializationRetryIntervalMs=" +
initializationRetryIntervalMs +
", commonProps=" + configMapToRedactedString(commonProps,
AdminClientConfig.configDef()) +
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
index 8a1f44fcea2..fb8812d6b79 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
@@ -34,7 +34,7 @@ import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemo
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
public class RemoteLogMetadataManagerTestUtils {
- private static final int METADATA_TOPIC_PARTITIONS_COUNT = 3;
+ static final int METADATA_TOPIC_PARTITIONS_COUNT = 3;
private static final short METADATA_TOPIC_REPLICATION_FACTOR = 2;
private static final long METADATA_TOPIC_RETENTION_MS = 24 * 60 * 60 *
1000L;
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
index 1874416f683..e3eb05b72b9 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
@@ -27,14 +27,17 @@ import org.junit.jupiter.api.Test;
import java.util.AbstractMap;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
+import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX;
+import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
@@ -140,6 +143,10 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
assertTrue(configString.contains("enable.auto.commit=false"));
}
+ private Map<String, Object> createValidConfigProps() {
+ return this.createValidConfigProps(Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap());
+ }
+
private Map<String, Object> createValidConfigProps(Map<String, Object>
commonClientConfig,
Map<String, Object>
producerConfig,
Map<String, Object>
consumerConfig) {
@@ -192,4 +199,20 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
Arrays.stream(sensitiveConfigKeys)
.forEach(config -> assertTrue(configString.contains(config +
"=(redacted)")));
}
+
+ @Test
+ public void testDefaultMinIsr() {
+ Map<String, Object> props = createValidConfigProps();
+ TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new
TopicBasedRemoteLogMetadataManagerConfig(props);
+ assertEquals(DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR,
rlmmConfig.metadataTopicMinIsr());
+ }
+
+ @Test
+ public void testCustomMinIsr() {
+ Map<String, Object> props = createValidConfigProps();
+ short customMinIsr = 3;
+ props.put(REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP, customMinIsr);
+ TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new
TopicBasedRemoteLogMetadataManagerConfig(props);
+ assertEquals(customMinIsr, rlmmConfig.metadataTopicMinIsr());
+ }
}
\ No newline at end of file
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index f564028ccaf..3ddee1b24f8 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -17,6 +17,9 @@
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
@@ -24,6 +27,8 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
@@ -49,6 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -370,4 +376,54 @@ public class TopicBasedRemoteLogMetadataManagerTest {
Exit.resetExitProcedure();
}
}
+
+ @ClusterTest
+ public void testRemoteLogMetadataTopicWithDefaultMinIsr() throws
ExecutionException, InterruptedException {
+ // Initialize the manager which will create the __remote_log_metadata
topic
+ TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager
= topicBasedRlmm();
+
verifyRemoteLogMetadataTopicWithMinIsr(topicBasedRemoteLogMetadataManager,
+
TopicBasedRemoteLogMetadataManagerConfig.DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR,
+ "default value");
+ }
+
+ @ClusterTest
+ public void testRemoteLogMetadataTopicWithCustomMinIsr() throws
ExecutionException, InterruptedException, IOException {
+ // Create a manager with custom min.isr value
+ short customMinIsr = 3;
+ Map<String, Object> overrideProps = Map.of(
+
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP,
customMinIsr
+ );
+ try (TopicBasedRemoteLogMetadataManager customRlmm =
RemoteLogMetadataManagerTestUtils.builder()
+ .bootstrapServers(clusterInstance.bootstrapServers())
+ .overrideRemoteLogMetadataManagerProps(overrideProps)
+ .build()) {
+ verifyRemoteLogMetadataTopicWithMinIsr(customRlmm, customMinIsr,
"custom value");
+ }
+ }
+
+ private void
verifyRemoteLogMetadataTopicWithMinIsr(TopicBasedRemoteLogMetadataManager rlmm,
+ short expectedMinIsr,
+ String
valueDescription)
+ throws
ExecutionException, InterruptedException {
+ try (Admin admin = clusterInstance.admin()) {
+ String metadataTopic =
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+ // Wait for the topic to be created
+ clusterInstance.waitTopicCreation(metadataTopic,
RemoteLogMetadataManagerTestUtils.METADATA_TOPIC_PARTITIONS_COUNT);
+
+ // Verify the topic exists
+ assertTrue(rlmm.doesTopicExist(admin, metadataTopic));
+
+ // Describe the topic configs to verify min.insync.replicas
+ ConfigResource topicResource = new
ConfigResource(ConfigResource.Type.TOPIC, metadataTopic);
+ DescribeConfigsResult describeResult =
admin.describeConfigs(List.of(topicResource));
+ Config config = describeResult.all().get().get(topicResource);
+
+ assertNotNull(config, "Topic config should not be null");
+ ConfigEntry minIsrEntry =
config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG);
+ assertNotNull(minIsrEntry, "min.insync.replicas config should
exist");
+ assertEquals(String.valueOf(expectedMinIsr), minIsrEntry.value(),
+ "min.insync.replicas should be " + expectedMinIsr + " (" +
valueDescription + ")");
+ }
+ }
}