This is an automated email from the ASF dual-hosted git repository.
clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9f7e8d478a9 KAFKA-16855: remote log disable policy in KRaft (#16653)
9f7e8d478a9 is described below
commit 9f7e8d478a9252f03f6adae3736d13799f137d6a
Author: Luke Chen <[email protected]>
AuthorDate: Sat Aug 3 17:38:41 2024 +0900
KAFKA-16855: remote log disable policy in KRaft (#16653)
Reviewers: Kamal Chandraprakash <[email protected]>, Christo
Lolov <[email protected]>
---
.../apache/kafka/common/config/TopicConfig.java | 19 ++--
.../java/kafka/log/remote/RemoteLogManager.java | 80 +++++++++-----
core/src/main/scala/kafka/log/LogManager.scala | 10 +-
.../main/scala/kafka/server/ConfigHandler.scala | 62 ++++++++---
.../server/ControllerConfigurationValidator.scala | 2 +-
.../main/scala/kafka/server/ReplicaManager.scala | 5 +-
core/src/main/scala/kafka/zk/AdminZkClient.scala | 5 +-
.../kafka/log/remote/RemoteLogManagerTest.java | 15 +--
.../kafka/admin/RemoteTopicCrudTest.scala | 49 +++++++--
.../test/scala/unit/kafka/log/LogConfigTest.scala | 75 +++++++++----
.../test/scala/unit/kafka/log/LogManagerTest.scala | 2 +-
.../ControllerConfigurationValidatorTest.scala | 4 +-
.../kafka/server/DynamicConfigChangeTest.scala | 13 ++-
.../kafka/storage/internals/log/LogConfig.java | 62 +++++++----
.../integration/DisableRemoteLogOnTopicTest.java | 120 +++++++++++++++++++++
15 files changed, 410 insertions(+), 113 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index 3689227d1fa..04c6c487cd0 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -93,17 +93,14 @@ public class TopicConfig {
"deletes the old segments. Default value is -2, it represents
`retention.bytes` value to be used. The effective value should always be " +
"less than or equal to `retention.bytes` value.";
- public static final String REMOTE_LOG_DISABLE_POLICY_RETAIN = "retain";
- public static final String REMOTE_LOG_DISABLE_POLICY_DELETE = "delete";
-
- public static final String REMOTE_LOG_DISABLE_POLICY_CONFIG =
"remote.log.disable.policy";
-
- public static final String REMOTE_LOG_DISABLE_POLICY_DOC =
String.format("Determines whether tiered data for a topic should be retained or
" +
- "deleted after tiered storage disablement on a topic. The two
valid options are \"%s\" and \"%s\". If %s is " +
- "selected then all data in remote will be kept post-disablement
and will only be deleted when it breaches expiration " +
- "thresholds. If %s is selected then the data will be made
inaccessible immediately by advancing the log start offset and will be " +
- "deleted asynchronously.", REMOTE_LOG_DISABLE_POLICY_RETAIN,
REMOTE_LOG_DISABLE_POLICY_DELETE,
- REMOTE_LOG_DISABLE_POLICY_RETAIN,
REMOTE_LOG_DISABLE_POLICY_DELETE);
+ public static final String REMOTE_LOG_COPY_DISABLE_CONFIG =
"remote.log.copy.disable";
+ public static final String REMOTE_LOG_COPY_DISABLE_DOC = "Determines
whether tiered data for a topic should become read only," +
+ " and no more data uploading on a topic.";
+
+ public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG =
"remote.log.delete.on.disable";
+ public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines
whether tiered data for a topic should be " +
+ "deleted after tiered storage is disabled on a topic. This
configuration should be enabled when trying to " +
+ "set `remote.storage.enable` from true to false";
public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes";
public static final String MAX_MESSAGE_BYTES_DOC =
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index f3f0ccc11ad..f53f5a35623 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -437,30 +437,48 @@ public class RemoteLogManager implements Closeable {
throw new KafkaException("RemoteLogManager is not configured when
remote storage system is enabled");
}
- Set<TopicIdPartition> leaderPartitions =
filterPartitions(partitionsBecomeLeader)
- .map(p -> new TopicIdPartition(topicIds.get(p.topic()),
p.topicPartition())).collect(Collectors.toSet());
+ Map<TopicIdPartition, Boolean> leaderPartitions =
filterPartitions(partitionsBecomeLeader)
+ .collect(Collectors.toMap(p -> new
TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()),
+ p -> p.log().exists(log ->
log.config().remoteLogCopyDisable())));
- Set<TopicIdPartition> followerPartitions =
filterPartitions(partitionsBecomeFollower)
- .map(p -> new TopicIdPartition(topicIds.get(p.topic()),
p.topicPartition())).collect(Collectors.toSet());
+ Map<TopicIdPartition, Boolean> followerPartitions =
filterPartitions(partitionsBecomeFollower)
+ .collect(Collectors.toMap(p -> new
TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()),
+ p -> p.log().exists(log ->
log.config().remoteLogCopyDisable())));
if (!leaderPartitions.isEmpty() || !followerPartitions.isEmpty()) {
LOGGER.debug("Effective topic partitions after filtering compact
and internal topics, leaders: {} and followers: {}",
leaderPartitions, followerPartitions);
- leaderPartitions.forEach(this::cacheTopicPartitionIds);
- followerPartitions.forEach(this::cacheTopicPartitionIds);
+ leaderPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp));
+ followerPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp));
-
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions,
followerPartitions);
- followerPartitions.forEach(this::doHandleFollowerPartition);
+
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions.keySet(),
followerPartitions.keySet());
+ followerPartitions.forEach((tp, __) ->
doHandleFollowerPartition(tp));
// If this node was the previous leader for the partition, then
the RLMTask might be running in the
// background thread and might emit metrics. So, removing the
metrics after marking this node as follower.
-
followerPartitions.forEach(this::removeRemoteTopicPartitionMetrics);
+ followerPartitions.forEach((tp, __) ->
removeRemoteTopicPartitionMetrics(tp));
leaderPartitions.forEach(this::doHandleLeaderPartition);
}
}
+ public void stopLeaderCopyRLMTasks(Set<Partition> partitions) {
+ for (Partition partition : partitions) {
+ TopicPartition tp = partition.topicPartition();
+ if (topicIdByPartitionMap.containsKey(tp)) {
+ TopicIdPartition tpId = new
TopicIdPartition(topicIdByPartitionMap.get(tp), tp);
+ leaderCopyRLMTasks.computeIfPresent(tpId, (topicIdPartition,
task) -> {
+ LOGGER.info("Cancelling the copy RLM task for tpId: {}",
tpId);
+ task.cancel();
+ LOGGER.info("Resetting remote copy lag metrics for tpId:
{}", tpId);
+ ((RLMCopyTask) task.rlmTask).resetLagStats();
+ return null;
+ });
+ }
+ }
+ }
+
/**
* Stop the remote-log-manager task for the given partitions. And, calls
the
* {@link RemoteLogMetadataManager#onStopPartitions(Set)} when {@link
StopPartition#deleteLocalLog()} is true.
@@ -507,16 +525,18 @@ public class RemoteLogManager implements Closeable {
LOGGER.error("Error while stopping the partition: {}",
stopPartition, ex);
}
}
- // Note `deleteLocalLog` will always be true when `deleteRemoteLog` is
true but not the other way around.
- Set<TopicIdPartition> deleteLocalPartitions = stopPartitions.stream()
- .filter(sp -> sp.deleteLocalLog() &&
topicIdByPartitionMap.containsKey(sp.topicPartition()))
+
+ // We want to remote topicId map and stopPartition on RLMM for
deleteLocalLog or stopRLMM partitions because
+ // in both case, they all mean the topic will not be held in this
broker anymore.
+ // NOTE: In ZK mode, this#stopPartitions method is called when Replica
state changes to Offline and ReplicaDeletionStarted
+ Set<TopicIdPartition> pendingActionsPartitions =
stopPartitions.stream()
+ .filter(sp -> (sp.stopRemoteLogMetadataManager() ||
sp.deleteLocalLog()) && topicIdByPartitionMap.containsKey(sp.topicPartition()))
.map(sp -> new
TopicIdPartition(topicIdByPartitionMap.get(sp.topicPartition()),
sp.topicPartition()))
.collect(Collectors.toSet());
- if (!deleteLocalPartitions.isEmpty()) {
- // NOTE: In ZK mode, this#stopPartitions method is called when
Replica state changes to Offline and
- // ReplicaDeletionStarted
- remoteLogMetadataManager.onStopPartitions(deleteLocalPartitions);
- deleteLocalPartitions.forEach(tpId ->
topicIdByPartitionMap.remove(tpId.topicPartition()));
+
+ if (!pendingActionsPartitions.isEmpty()) {
+ pendingActionsPartitions.forEach(tpId ->
topicIdByPartitionMap.remove(tpId.topicPartition()));
+
remoteLogMetadataManager.onStopPartitions(pendingActionsPartitions);
}
}
@@ -986,6 +1006,13 @@ public class RemoteLogManager implements Closeable {
}
}
+ void resetLagStats() {
+ String topic = topicIdPartition.topic();
+ int partition = topicIdPartition.partition();
+ brokerTopicStats.recordRemoteCopyLagBytes(topic, partition, 0);
+ brokerTopicStats.recordRemoteCopyLagSegments(topic, partition, 0);
+ }
+
private Path toPathIfExists(File file) {
return file.exists() ? file.toPath() : null;
}
@@ -1794,20 +1821,23 @@ public class RemoteLogManager implements Closeable {
new RemoteLogReader(fetchInfo, this, callback,
brokerTopicStats, rlmFetchQuotaManager, remoteReadTimer));
}
- void doHandleLeaderPartition(TopicIdPartition topicPartition) {
+ void doHandleLeaderPartition(TopicIdPartition topicPartition, Boolean
remoteLogCopyDisable) {
RLMTaskWithFuture followerRLMTaskWithFuture =
followerRLMTasks.remove(topicPartition);
if (followerRLMTaskWithFuture != null) {
LOGGER.info("Cancelling the follower task: {}",
followerRLMTaskWithFuture.rlmTask);
followerRLMTaskWithFuture.cancel();
}
- leaderCopyRLMTasks.computeIfAbsent(topicPartition, topicIdPartition ->
{
- RLMCopyTask task = new RLMCopyTask(topicIdPartition,
this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
- // set this upfront when it is getting initialized instead of
doing it after scheduling.
- LOGGER.info("Created a new copy task: {} and getting scheduled",
task);
- ScheduledFuture<?> future =
rlmCopyThreadPool.scheduleWithFixedDelay(task, 0, delayInMs,
TimeUnit.MILLISECONDS);
- return new RLMTaskWithFuture(task, future);
- });
+ // Only create copy task when remoteLogCopyDisable is disabled
+ if (!remoteLogCopyDisable) {
+ leaderCopyRLMTasks.computeIfAbsent(topicPartition,
topicIdPartition -> {
+ RLMCopyTask task = new RLMCopyTask(topicIdPartition,
this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
+ // set this upfront when it is getting initialized instead of
doing it after scheduling.
+ LOGGER.info("Created a new copy task: {} and getting
scheduled", task);
+ ScheduledFuture<?> future =
rlmCopyThreadPool.scheduleWithFixedDelay(task, 0, delayInMs,
TimeUnit.MILLISECONDS);
+ return new RLMTaskWithFuture(task, future);
+ });
+ }
leaderExpirationRLMTasks.computeIfAbsent(topicPartition,
topicIdPartition -> {
RLMExpirationTask task = new RLMExpirationTask(topicIdPartition);
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index 95d2062bf89..66fedbfae6d 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -964,15 +964,23 @@ class LogManager(logDirs: Seq[File],
*/
def updateTopicConfig(topic: String,
newTopicConfig: Properties,
- isRemoteLogStorageSystemEnabled: Boolean): Unit = {
+ isRemoteLogStorageSystemEnabled: Boolean,
+ wasRemoteLogEnabled: Boolean,
+ fromZK: Boolean): Unit = {
topicConfigUpdated(topic)
val logs = logsByTopic(topic)
// Combine the default properties with the overrides in zk to create the
new LogConfig
val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals,
newTopicConfig)
+ val isRemoteLogStorageEnabled = newLogConfig.remoteStorageEnable()
// We would like to validate the configuration no matter whether the logs
have materialised on disk or not.
// Otherwise we risk someone creating a tiered-topic, disabling Tiered
Storage cluster-wide and the check
// failing since the logs for the topic are non-existent.
LogConfig.validateRemoteStorageOnlyIfSystemEnabled(newLogConfig.values(),
isRemoteLogStorageSystemEnabled, true)
+ // `remote.log.delete.on.disable` and `remote.log.copy.disable` are
unsupported in ZK mode
+ if (fromZK) {
+
LogConfig.validateNoInvalidRemoteStorageConfigsInZK(newLogConfig.values())
+ }
+ LogConfig.validateTurningOffRemoteStorageWithDelete(newLogConfig.values(),
wasRemoteLogEnabled, isRemoteLogStorageEnabled)
if (logs.nonEmpty) {
logs.foreach { log =>
val oldLogConfig = log.updateConfig(newLogConfig)
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 2d478cfc30a..9f183b63ea6 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -26,14 +26,14 @@ import kafka.server.Constants._
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Implicits._
import kafka.utils.Logging
-import org.apache.kafka.server.config.{ReplicationConfigs, QuotaConfigs,
ZooKeeperInternals}
+import org.apache.kafka.server.config.{QuotaConfigs, ReplicationConfigs,
ZooKeeperInternals}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.metrics.Quota._
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.ClientMetricsManager
-import org.apache.kafka.storage.internals.log.ThrottledReplicaListValidator
+import org.apache.kafka.storage.internals.log.{LogStartOffsetIncrementReason,
ThrottledReplicaListValidator}
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
import scala.annotation.nowarn
@@ -68,25 +68,61 @@ class TopicConfigHandler(private val replicaManager:
ReplicaManager,
}
val logs = logManager.logsByTopic(topic)
- val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled())
+ val wasRemoteLogEnabled = logs.exists(_.remoteLogEnabled())
+ val wasCopyDisabled = logs.exists(_.config.remoteLogCopyDisable())
- logManager.updateTopicConfig(topic, props,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
- maybeBootstrapRemoteLogComponents(topic, logs,
wasRemoteLogEnabledBeforeUpdate)
+ // kafkaController is only defined in Zookeeper's mode
+ logManager.updateTopicConfig(topic, props,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
+ wasRemoteLogEnabled, kafkaController.isDefined)
+ maybeUpdateRemoteLogComponents(topic, logs, wasRemoteLogEnabled,
wasCopyDisabled)
}
- private[server] def maybeBootstrapRemoteLogComponents(topic: String,
- logs: Seq[UnifiedLog],
-
wasRemoteLogEnabledBeforeUpdate: Boolean): Unit = {
+ private[server] def maybeUpdateRemoteLogComponents(topic: String,
+ logs: Seq[UnifiedLog],
+ wasRemoteLogEnabled:
Boolean,
+ wasCopyDisabled:
Boolean): Unit = {
val isRemoteLogEnabled = logs.exists(_.remoteLogEnabled())
+ val isCopyDisabled = logs.exists(_.config.remoteLogCopyDisable())
+ val isDeleteOnDisable = logs.exists(_.config.remoteLogDeleteOnDisable())
+
+ val (leaderPartitions, followerPartitions) =
+ logs.flatMap(log =>
replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader)
+
// Topic configs gets updated incrementally. This check is added to
prevent redundant updates.
- if (!wasRemoteLogEnabledBeforeUpdate && isRemoteLogEnabled) {
- val (leaderPartitions, followerPartitions) =
- logs.flatMap(log =>
replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader)
+ // When remote log is enabled, or remote copy is enabled, we should create
RLM tasks accordingly via `onLeadershipChange`.
+ if (isRemoteLogEnabled && (!wasRemoteLogEnabled || (wasCopyDisabled &&
!isCopyDisabled))) {
val topicIds = Collections.singletonMap(topic,
replicaManager.metadataCache.getTopicId(topic))
replicaManager.remoteLogManager.foreach(rlm =>
rlm.onLeadershipChange(leaderPartitions.toSet.asJava,
followerPartitions.toSet.asJava, topicIds))
- } else if (wasRemoteLogEnabledBeforeUpdate && !isRemoteLogEnabled) {
- warn(s"Disabling remote log on the topic: $topic is not supported.")
+ }
+
+ // When copy disabled, we should stop leaderCopyRLMTask, but keep
expirationTask
+ if (isRemoteLogEnabled && !wasCopyDisabled && isCopyDisabled) {
+ replicaManager.remoteLogManager.foreach(rlm => {
+ rlm.stopLeaderCopyRLMTasks(leaderPartitions.toSet.asJava);
+ })
+ }
+
+ // Disabling remote log storage on this topic
+ if (wasRemoteLogEnabled && !isRemoteLogEnabled && isDeleteOnDisable) {
+ val stopPartitions: java.util.HashSet[StopPartition] = new
java.util.HashSet[StopPartition]()
+ leaderPartitions.foreach(partition => {
+ // delete remote logs and stop RemoteLogMetadataManager
+ stopPartitions.add(StopPartition(partition.topicPartition,
deleteLocalLog = false,
+ deleteRemoteLog = true, stopRemoteLogMetadataManager = true))
+ })
+
+ followerPartitions.foreach(partition => {
+ // we need to cancel follower tasks and stop RemoteLogMetadataManager
+ stopPartitions.add(StopPartition(partition.topicPartition,
deleteLocalLog = false,
+ deleteRemoteLog = false, stopRemoteLogMetadataManager = true))
+ })
+
+ // update the log start offset to local log start offset for the leader
replicas
+ logs.filter(log => leaderPartitions.exists(p =>
p.topicPartition.equals(log.topicPartition)))
+ .foreach(log =>
log.maybeIncrementLogStartOffset(log.localLogStartOffset(),
LogStartOffsetIncrementReason.SegmentDeletion))
+
+ replicaManager.remoteLogManager.foreach(rlm =>
rlm.stopPartitions(stopPartitions, (_, _) => {}))
}
}
diff --git
a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index 06a60e30076..49083e3a0cd 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -109,7 +109,7 @@ class ControllerConfigurationValidator(kafkaConfig:
KafkaConfig) extends Configu
nullTopicConfigs.mkString(","))
}
LogConfig.validate(oldConfigs, properties,
kafkaConfig.extractLogConfigMap,
- kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+ kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
false)
case BROKER => validateBrokerName(resource.name())
case CLIENT_METRICS =>
val properties = new Properties()
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6ebd151a032..11f2e4ff03c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -99,7 +99,10 @@ case class LogDeleteRecordsResult(requestedOffset: Long,
lowWatermark: Long, exc
}
}
-case class StopPartition(topicPartition: TopicPartition, deleteLocalLog:
Boolean, deleteRemoteLog: Boolean = false)
+case class StopPartition(topicPartition: TopicPartition,
+ deleteLocalLog: Boolean,
+ deleteRemoteLog: Boolean = false,
+ stopRemoteLogMetadataManager: Boolean = false)
/**
* Result metadata of a log read operation on the log
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala
b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index 8db20583e24..15c95b998b6 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -163,7 +163,8 @@ class AdminZkClient(zkClient: KafkaZkClient,
LogConfig.validate(Collections.emptyMap(), config,
kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()),
-
kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+
kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()),
+ true)
}
private def writeTopicPartitionAssignment(topic: String, replicaAssignment:
Map[Int, ReplicaAssignment],
@@ -481,7 +482,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
// remove the topic overrides
LogConfig.validate(Collections.emptyMap(), configs,
kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()),
-
kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+
kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()),
true)
}
/**
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 4e5f70f9337..f27e39c1f1d 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -1309,12 +1309,12 @@ public class RemoteLogManagerTest {
verifyInCache(followerTopicIdPartition, leaderTopicIdPartition);
// Evicts from topicId cache
- remoteLogManager.stopPartitions(Collections.singleton(new
StopPartition(leaderTopicIdPartition.topicPartition(), true, true)), (tp, ex)
-> { });
+ remoteLogManager.stopPartitions(Collections.singleton(new
StopPartition(leaderTopicIdPartition.topicPartition(), true, true, true)), (tp,
ex) -> { });
verifyNotInCache(leaderTopicIdPartition);
verifyInCache(followerTopicIdPartition);
// Evicts from topicId cache
- remoteLogManager.stopPartitions(Collections.singleton(new
StopPartition(followerTopicIdPartition.topicPartition(), true, true)), (tp, ex)
-> { });
+ remoteLogManager.stopPartitions(Collections.singleton(new
StopPartition(followerTopicIdPartition.topicPartition(), true, true, true)),
(tp, ex) -> { });
verifyNotInCache(leaderTopicIdPartition, followerTopicIdPartition);
}
@@ -1344,7 +1344,7 @@ public class RemoteLogManagerTest {
spyRemoteLogManager.onLeadershipChange(
Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.emptySet(), topicIds);
-
verify(spyRemoteLogManager).doHandleLeaderPartition(eq(leaderTopicIdPartition));
+
verify(spyRemoteLogManager).doHandleLeaderPartition(eq(leaderTopicIdPartition),
eq(false));
}
private MemoryRecords records(long timestamp,
@@ -1837,8 +1837,8 @@ public class RemoteLogManagerTest {
remoteLogManager.startup();
BiConsumer<TopicPartition, Throwable> errorHandler = (topicPartition,
throwable) -> fail("shouldn't be called");
Set<StopPartition> partitions = new HashSet<>();
- partitions.add(new
StopPartition(leaderTopicIdPartition.topicPartition(), true, false));
- partitions.add(new
StopPartition(followerTopicIdPartition.topicPartition(), true, false));
+ partitions.add(new
StopPartition(leaderTopicIdPartition.topicPartition(), true, false, false));
+ partitions.add(new
StopPartition(followerTopicIdPartition.topicPartition(), true, false, false));
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
@@ -1860,8 +1860,8 @@ public class RemoteLogManagerTest {
BiConsumer<TopicPartition, Throwable> errorHandler =
(topicPartition, ex) -> fail("shouldn't be called: " + ex);
Set<StopPartition> partitions = new HashSet<>();
- partitions.add(new
StopPartition(leaderTopicIdPartition.topicPartition(), true, true));
- partitions.add(new
StopPartition(followerTopicIdPartition.topicPartition(), true, true));
+ partitions.add(new
StopPartition(leaderTopicIdPartition.topicPartition(), true, true, true));
+ partitions.add(new
StopPartition(followerTopicIdPartition.topicPartition(), true, true, true));
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
@@ -3208,6 +3208,7 @@ public class RemoteLogManagerTest {
when(partition.topic()).thenReturn(tp.topic());
when(log.remoteLogEnabled()).thenReturn(true);
when(partition.log()).thenReturn(Option.apply(log));
+ when(log.config()).thenReturn(new LogConfig(new Properties()));
return partition;
}
diff --git
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index 462698734a0..25f366bfc94 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -156,12 +156,13 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig = topicConfig))
}
+ // `remote.log.delete.on.disable` and `remote.log.copy.disable` only works
in KRaft mode.
@ParameterizedTest
- @CsvSource(Array("zk,retain", "zk,delete", "kraft,retain", "kraft,delete"))
- def testCreateRemoteTopicWithDisablePolicyRetain(quorum: String, policy:
String): Unit = {
+ @CsvSource(Array("kraft,true,true", "kraft,true,false", "kraft,false,true",
"kraft,false,false"))
+ def testCreateRemoteTopicWithCopyDisabledAndDeleteOnDisable(quorum: String,
copyDisabled: Boolean, deleteOnDisable: Boolean): Unit = {
val topicConfig = new Properties()
- topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
- topicConfig.put(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, policy)
+ topicConfig.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
copyDisabled.toString)
+ topicConfig.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
deleteOnDisable.toString)
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
verifyRemoteLogTopicConfigs(topicConfig)
@@ -311,6 +312,35 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
() => admin.incrementalAlterConfigs(configs).all().get(), "Disabling
remote storage feature on the topic level is not supported.")
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk"))
+ def testUpdateInvalidRemoteStorageConfigUnderZK(quorum: String): Unit = {
+ val admin = createAdminClient()
+ val errorMsg = "It is invalid to set `remote.log.delete.on.disable` or
`remote.log.copy.disable` under Zookeeper's mode."
+ val topicConfig = new Properties
+ topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"true")
+ TestUtils.createTopicWithAdmin(admin, testTopicName, brokers,
controllerServers, numPartitions, numReplicationFactor,
+ topicConfig = topicConfig)
+
+ val configs = new util.HashMap[ConfigResource,
util.Collection[AlterConfigOp]]()
+ configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
+ util.Arrays.asList(
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
+ AlterConfigOp.OpType.SET),
+ ))
+ assertThrowsException(classOf[InvalidConfigurationException],
+ () => admin.incrementalAlterConfigs(configs).all().get(), errorMsg)
+
+ configs.clear()
+ configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
+ util.Arrays.asList(
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true"),
+ AlterConfigOp.OpType.SET),
+ ))
+ assertThrowsException(classOf[InvalidConfigurationException],
+ () => admin.incrementalAlterConfigs(configs).all().get(), errorMsg)
+ }
+
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testTopicDeletion(quorum: String): Unit = {
@@ -409,10 +439,15 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.getProperty(TopicConfig.RETENTION_BYTES_CONFIG).toLong
==
logBuffer.head.config.retentionSize
}
- if
(topicConfig.contains(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG)) {
+ if (topicConfig.contains(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG)) {
+ result = result &&
+
topicConfig.getProperty(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG).toBoolean ==
+ logBuffer.head.config.remoteLogCopyDisable()
+ }
+ if
(topicConfig.contains(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG)) {
result = result &&
-
topicConfig.getProperty(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG).equals(
- logBuffer.head.config.remoteLogDisablePolicy())
+
topicConfig.getProperty(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG).toBoolean
==
+ logBuffer.head.config.remoteLogDeleteOnDisable()
}
}
result
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 4e7e4e23b38..8126bb08b07 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -100,7 +100,8 @@ class LogConfigTest {
case TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG =>
assertPropertyInvalid(name, "not_a_number", "-2")
case TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG =>
assertPropertyInvalid(name, "not_a_number", "-1")
case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG =>
assertPropertyInvalid(name, "not_a_number", "-0.1")
- case TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG =>
assertPropertyInvalid(name, "not_a_number", "remove", "0", "true")
+ case TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG =>
assertPropertyInvalid(name, "not_a_number", "remove", "0")
+ case TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG =>
assertPropertyInvalid(name, "not_a_number", "remove", "0")
case _ => assertPropertyInvalid(name, "not_a_number", "-1")
})
@@ -300,7 +301,7 @@ class LogConfigTest {
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG,
localRetentionMs.toString)
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG,
localRetentionBytes.toString)
assertThrows(classOf[ConfigException],
- () => LogConfig.validate(Collections.emptyMap(), props,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () => LogConfig.validate(Collections.emptyMap(), props,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
}
@Test
@@ -312,17 +313,17 @@ class LogConfigTest {
val logProps = new Properties()
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_DELETE)
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
- LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+ LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT)
assertThrows(classOf[ConfigException],
- () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact")
assertThrows(classOf[ConfigException],
- () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
assertThrows(classOf[ConfigException],
- () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
}
@ParameterizedTest(name = "testEnableRemoteLogStorage with
sysRemoteStorageEnabled: {0}")
@@ -335,10 +336,10 @@ class LogConfigTest {
val logProps = new Properties()
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
if (sysRemoteStorageEnabled) {
- LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+ LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
} else {
val message = assertThrows(classOf[ConfigException],
- () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
assertTrue(message.getMessage.contains("Tiered Storage functionality is
disabled in the broker"))
}
}
@@ -355,11 +356,20 @@ class LogConfigTest {
if (wasRemoteStorageEnabled) {
val message = assertThrows(classOf[InvalidConfigurationException],
() =>
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"true"),
- logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
- assertTrue(message.getMessage.contains("Disabling remote storage feature
on the topic level is not supported."))
+ logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false))
+ assertTrue(message.getMessage.contains("It is invalid to disable remote
storage without deleting remote data. " +
+ "If you want to keep the remote data and turn to read only, please set
`remote.storage.enable=true,remote.log.copy.disable=true`. " +
+ "If you want to disable remote storage and delete all remote data,
please set `remote.storage.enable=false,remote.log.delete.on.disable=true`."))
+
+
+ // It should be able to disable the remote log storage when delete on
disable is set to true
+ logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true")
+
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"true"),
+ logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)
} else {
- LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
-
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"false"), logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+ LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
+
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"false"), logProps,
+ kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)
}
}
@@ -378,10 +388,12 @@ class LogConfigTest {
logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500")
if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException],
- () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
+ kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
true))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG))
} else {
- LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+ LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
+ kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
true)
}
}
@@ -400,10 +412,12 @@ class LogConfigTest {
logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128")
if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException],
- () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
+ kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
true))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
} else {
- LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+ LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
+ kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
true)
}
}
@@ -426,19 +440,34 @@ class LogConfigTest {
}
@ParameterizedTest
- @ValueSource(strings = Array(TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN,
TopicConfig.REMOTE_LOG_DISABLE_POLICY_DELETE))
- def testValidRemoteLogDisablePolicy(policy: String): Unit = {
+ @ValueSource(booleans = Array(true, false))
+ def testValidRemoteLogCopyDisabled(copyDisabled: Boolean): Unit = {
val logProps = new Properties
- logProps.put(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, policy)
+ logProps.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
copyDisabled.toString)
LogConfig.validate(logProps)
}
@ParameterizedTest
- @ValueSource(strings = Array("keep", "remove"))
- def testInvalidRemoteLogDisablePolicy(policy: String): Unit = {
+ @ValueSource(booleans = Array(true, false))
+ def testValidRemoteLogDeleteOnDisable(deleteOnDisable: Boolean): Unit = {
val logProps = new Properties
- logProps.put(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, policy)
- assertThrows(classOf[ConfigException], () => LogConfig.validate(logProps))
+ logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
deleteOnDisable.toString)
+ LogConfig.validate(logProps)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings =
Array(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG))
+ def testInValidRemoteConfigsInZK(configKey: String): Unit = {
+ val kafkaProps = TestUtils.createDummyBrokerConfig()
+
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
"true")
+ val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
+ val logProps = new Properties
+ logProps.put(configKey, "true")
+
+ val message = assertThrows(classOf[InvalidConfigurationException],
+ () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap, true, true))
+ assertTrue(message.getMessage.contains("It is invalid to set
`remote.log.delete.on.disable` or " +
+ "`remote.log.copy.disable` under Zookeeper's mode."))
}
/* Verify that when the deprecated config
LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG has non default value the new
configs
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index b5a029a7411..ceb2342446f 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -797,7 +797,7 @@ class LogManagerTest {
val newProperties = new Properties()
newProperties.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_DELETE)
- spyLogManager.updateTopicConfig(topic, newProperties,
isRemoteLogStorageSystemEnabled = false)
+ spyLogManager.updateTopicConfig(topic, newProperties,
isRemoteLogStorageSystemEnabled = false, wasRemoteLogEnabled = false, fromZK =
false)
assertTrue(log0.config.delete)
assertTrue(log1.config.delete)
diff --git
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
index 250f07ca23e..4cf5ad70cee 100644
---
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
@@ -93,7 +93,9 @@ class ControllerConfigurationValidatorTest {
val config = new util.TreeMap[String, String]()
config.put(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")
if (wasRemoteStorageEnabled) {
- assertEquals("Disabling remote storage feature on the topic level is not
supported.",
+ assertEquals("It is invalid to disable remote storage without deleting
remote data. " +
+ "If you want to keep the remote data and turn to read only, please set
`remote.storage.enable=true,remote.log.copy.disable=true`. " +
+ "If you want to disable remote storage and delete all remote data,
please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.",
assertThrows(classOf[InvalidConfigurationException], () =>
validator.validate(
new ConfigResource(TOPIC, "foo"), config,
util.Collections.singletonMap(REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"true"))).getMessage)
} else {
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index cc4623b1277..dcec4a75d96 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1
import org.apache.kafka.server.config.{ConfigType, QuotaConfigs,
ServerLogConfigs, ZooKeeperInternals}
+import org.apache.kafka.storage.internals.log.LogConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Test, Timeout}
import org.junit.jupiter.params.ParameterizedTest
@@ -659,6 +660,7 @@ class DynamicConfigChangeUnitTest {
when(log0.remoteLogEnabled()).thenReturn(true)
when(partition0.isLeader).thenReturn(true)
when(replicaManager.onlinePartition(tp0)).thenReturn(Some(partition0))
+ when(log0.config).thenReturn(new LogConfig(Collections.emptyMap()))
val tp1 = new TopicPartition(topic, 1)
val log1: UnifiedLog = mock(classOf[UnifiedLog])
@@ -667,6 +669,7 @@ class DynamicConfigChangeUnitTest {
when(log1.remoteLogEnabled()).thenReturn(true)
when(partition1.isLeader).thenReturn(false)
when(replicaManager.onlinePartition(tp1)).thenReturn(Some(partition1))
+ when(log1.config).thenReturn(new LogConfig(Collections.emptyMap()))
val leaderPartitionsArg: ArgumentCaptor[util.Set[Partition]] =
ArgumentCaptor.forClass(classOf[util.Set[Partition]])
val followerPartitionsArg: ArgumentCaptor[util.Set[Partition]] =
ArgumentCaptor.forClass(classOf[util.Set[Partition]])
@@ -674,7 +677,7 @@ class DynamicConfigChangeUnitTest {
val isRemoteLogEnabledBeforeUpdate = false
val configHandler: TopicConfigHandler = new
TopicConfigHandler(replicaManager, null, null, None)
- configHandler.maybeBootstrapRemoteLogComponents(topic, Seq(log0, log1),
isRemoteLogEnabledBeforeUpdate)
+ configHandler.maybeUpdateRemoteLogComponents(topic, Seq(log0, log1),
isRemoteLogEnabledBeforeUpdate, false)
assertEquals(Collections.singleton(partition0),
leaderPartitionsArg.getValue)
assertEquals(Collections.singleton(partition1),
followerPartitionsArg.getValue)
}
@@ -682,17 +685,23 @@ class DynamicConfigChangeUnitTest {
@Test
def testEnableRemoteLogStorageOnTopicOnAlreadyEnabledTopic(): Unit = {
val topic = "test-topic"
+ val tp0 = new TopicPartition(topic, 0)
val rlm: RemoteLogManager = mock(classOf[RemoteLogManager])
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+ val partition: Partition = mock(classOf[Partition])
when(replicaManager.remoteLogManager).thenReturn(Some(rlm))
+ when(replicaManager.onlinePartition(tp0)).thenReturn(Some(partition))
val log0: UnifiedLog = mock(classOf[UnifiedLog])
when(log0.remoteLogEnabled()).thenReturn(true)
doNothing().when(rlm).onLeadershipChange(any(), any(), any())
+ when(log0.config).thenReturn(new LogConfig(Collections.emptyMap()))
+ when(log0.topicPartition).thenReturn(tp0)
+ when(partition.isLeader).thenReturn(true)
val isRemoteLogEnabledBeforeUpdate = true
val configHandler: TopicConfigHandler = new
TopicConfigHandler(replicaManager, null, null, None)
- configHandler.maybeBootstrapRemoteLogComponents(topic, Seq(log0),
isRemoteLogEnabledBeforeUpdate)
+ configHandler.maybeUpdateRemoteLogComponents(topic, Seq(log0),
isRemoteLogEnabledBeforeUpdate, false)
verify(rlm, never()).onLeadershipChange(any(), any(), any())
}
}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index d026ea3b4b1..a0ca52678ad 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -113,13 +113,15 @@ public class LogConfig extends AbstractConfig {
public static class RemoteLogConfig {
public final boolean remoteStorageEnable;
- public final String remoteLogDisablePolicy;
+ public final boolean remoteLogDeleteOnDisable;
+ public final boolean remoteLogCopyDisable;
public final long localRetentionMs;
public final long localRetentionBytes;
private RemoteLogConfig(LogConfig config) {
this.remoteStorageEnable =
config.getBoolean(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
- this.remoteLogDisablePolicy =
config.getString(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG);
+ this.remoteLogCopyDisable =
config.getBoolean(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG);
+ this.remoteLogDeleteOnDisable =
config.getBoolean(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG);
this.localRetentionMs =
config.getLong(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG);
this.localRetentionBytes =
config.getLong(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
}
@@ -128,7 +130,8 @@ public class LogConfig extends AbstractConfig {
public String toString() {
return "RemoteLogConfig{" +
"remoteStorageEnable=" + remoteStorageEnable +
- ", remoteLogDisablePolicy=" + remoteLogDisablePolicy +
+ ", remoteLogCopyDisable=" + remoteLogCopyDisable +
+ ", remoteLogDeleteOnDisable=" + remoteLogDeleteOnDisable +
", localRetentionMs=" + localRetentionMs +
", localRetentionBytes=" + localRetentionBytes +
'}';
@@ -204,7 +207,8 @@ public class LogConfig extends AbstractConfig {
// Visible for testing
public static final Set<String> CONFIGS_WITH_NO_SERVER_DEFAULTS =
Collections.unmodifiableSet(Utils.mkSet(
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
- TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG,
+ TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
+ TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
QuotaConfigs.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG,
QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG
));
@@ -325,9 +329,8 @@ public class LogConfig extends AbstractConfig {
TopicConfig.LOCAL_LOG_RETENTION_MS_DOC)
.define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG,
DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM,
TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC)
- .define(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, STRING,
TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN,
- in(TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN,
TopicConfig.REMOTE_LOG_DISABLE_POLICY_DELETE),
- MEDIUM, TopicConfig.REMOTE_LOG_DISABLE_POLICY_DOC);
+ .define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN,
false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC)
+ .define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC);
}
public final Set<String> overriddenConfigs;
@@ -508,8 +511,12 @@ public class LogConfig extends AbstractConfig {
return remoteLogConfig.remoteStorageEnable;
}
- public String remoteLogDisablePolicy() {
- return remoteLogConfig.remoteLogDisablePolicy;
+ public Boolean remoteLogDeleteOnDisable() {
+ return remoteLogConfig.remoteLogDeleteOnDisable;
+ }
+
+ public Boolean remoteLogCopyDisable() {
+ return remoteLogConfig.remoteLogCopyDisable;
}
public long localRetentionMs() {
@@ -613,11 +620,17 @@ public class LogConfig extends AbstractConfig {
* @param existingConfigs The existing properties
* @param newConfigs The new properties to be
validated
* @param isRemoteLogStorageSystemEnabled true if system wise remote log
storage is enabled
+ * @param fromZK true if this is a ZK cluster
*/
private static void validateTopicLogConfigValues(Map<String, String>
existingConfigs,
Map<?, ?> newConfigs,
- boolean
isRemoteLogStorageSystemEnabled) {
+ boolean
isRemoteLogStorageSystemEnabled,
+ boolean fromZK) {
validateValues(newConfigs);
+
+ if (fromZK) {
+ validateNoInvalidRemoteStorageConfigsInZK(newConfigs);
+ }
boolean isRemoteLogStorageEnabled = (Boolean)
newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
if (isRemoteLogStorageEnabled) {
validateRemoteStorageOnlyIfSystemEnabled(newConfigs,
isRemoteLogStorageSystemEnabled, false);
@@ -626,14 +639,26 @@ public class LogConfig extends AbstractConfig {
validateRemoteStorageRetentionTime(newConfigs);
} else {
// The new config "remote.storage.enable" is false, validate if
it's turning from true to false
- validateNotTurningOffRemoteStorage(existingConfigs);
+ boolean wasRemoteLogEnabled =
Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"false"));
+ validateTurningOffRemoteStorageWithDelete(newConfigs,
wasRemoteLogEnabled, isRemoteLogStorageEnabled);
+ }
+ }
+
+ public static void validateTurningOffRemoteStorageWithDelete(Map<?, ?>
newConfigs, boolean wasRemoteLogEnabled, boolean isRemoteLogStorageEnabled) {
+ boolean isRemoteLogDeleteOnDisable = (Boolean)
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
false);
+ if (wasRemoteLogEnabled && !isRemoteLogStorageEnabled &&
!isRemoteLogDeleteOnDisable) {
+ throw new InvalidConfigurationException("It is invalid to disable
remote storage without deleting remote data. " +
+ "If you want to keep the remote data and turn to read
only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " +
+ "If you want to disable remote storage and delete all
remote data, please set
`remote.storage.enable=false,remote.log.delete.on.disable=true`.");
}
}
- public static void validateNotTurningOffRemoteStorage(Map<String, String>
existingConfigs) {
- boolean wasRemoteLogEnabledBeforeUpdate =
Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"false"));
- if (wasRemoteLogEnabledBeforeUpdate) {
- throw new InvalidConfigurationException("Disabling remote storage
feature on the topic level is not supported.");
+ public static void validateNoInvalidRemoteStorageConfigsInZK(Map<?, ?>
newConfigs) {
+ boolean isRemoteLogDeleteOnDisable = (Boolean)
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
false);
+ boolean isRemoteLogCopyDisabled = (Boolean)
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
false);
+ if (isRemoteLogDeleteOnDisable || isRemoteLogCopyDisabled) {
+ throw new InvalidConfigurationException("It is invalid to set
`remote.log.delete.on.disable` or " +
+ "`remote.log.copy.disable` under Zookeeper's mode.");
}
}
@@ -694,13 +719,14 @@ public class LogConfig extends AbstractConfig {
* Check that the given properties contain only valid log config names and
that all values can be parsed and are valid
*/
public static void validate(Properties props) {
- validate(Collections.emptyMap(), props, Collections.emptyMap(), false);
+ validate(Collections.emptyMap(), props, Collections.emptyMap(), false,
false);
}
public static void validate(Map<String, String> existingConfigs,
Properties props,
Map<?, ?> configuredProps,
- boolean isRemoteLogStorageSystemEnabled) {
+ boolean isRemoteLogStorageSystemEnabled,
+ boolean fromZK) {
validateNames(props);
if (configuredProps == null || configuredProps.isEmpty()) {
Map<?, ?> valueMaps = CONFIG.parse(props);
@@ -709,7 +735,7 @@ public class LogConfig extends AbstractConfig {
Map<Object, Object> combinedConfigs = new
HashMap<>(configuredProps);
combinedConfigs.putAll(props);
Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
- validateTopicLogConfigValues(existingConfigs, valueMaps,
isRemoteLogStorageSystemEnabled);
+ validateTopicLogConfigValues(existingConfigs, valueMaps,
isRemoteLogStorageSystemEnabled, fromZK);
}
}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java
new file mode 100644
index 00000000000..6bcaea6a363
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+
+public final class DisableRemoteLogOnTopicTest extends
TieredStorageTestHarness {
+
+ @Override
+ public int brokerCount() {
+ return 2;
+ }
+
+ @ParameterizedTest(name = "{displayName}.quorum={0}")
+ @ValueSource(strings = {"kraft"})
+ public void executeTieredStorageTest(String quorum) {
+ super.executeTieredStorageTest(quorum);
+ }
+
+ @Override
+ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ final Integer broker0 = 0;
+ final Integer broker1 = 1;
+ final String topicA = "topicA";
+ final Integer p0 = 0;
+ final Integer partitionCount = 1;
+ final Integer replicationFactor = 2;
+ final Integer maxBatchCountPerSegment = 1;
+ final boolean enableRemoteLogStorage = true;
+ final Map<Integer, List<Integer>> assignment = mkMap(
+ mkEntry(p0, Arrays.asList(broker0, broker1))
+ );
+ final Map<String, String> disableCopy = new HashMap<>();
+ disableCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true");
+
+ final Map<String, String> deleteOnDisable = new HashMap<>();
+ deleteOnDisable.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"false");
+ deleteOnDisable.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
"true");
+
+ builder
+ .createTopic(topicA, partitionCount, replicationFactor,
maxBatchCountPerSegment, assignment,
+ enableRemoteLogStorage)
+ // send records to partition 0
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new
KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new
KeyValueSpec("k1", "v1"))
+ .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
+ .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new
KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+ // disable remote log copy
+ .updateTopicConfig(topicA,
+
Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
+ Collections.emptyList())
+
+ // make sure we can still consume from the beginning of the
topic to read data from local and remote storage
+ .expectFetchFromTieredStorage(broker0, topicA, p0, 2)
+ .consume(topicA, p0, 0L, 3, 2)
+
+ // re-enable remote log copy
+ .updateTopicConfig(topicA,
+
Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "false"),
+ Collections.emptyList())
+
+ // make sure the logs can be offloaded
+ .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
+ .produce(topicA, p0, new KeyValueSpec("k3", "v3"))
+
+ // explicitly disable remote log copy
+ .updateTopicConfig(topicA,
+ disableCopy,
+ Collections.emptyList())
+ // make sure we can still consume from the beginning of the
topic to read data from local and remote storage
+ .expectFetchFromTieredStorage(broker0, topicA, p0, 3)
+ .consume(topicA, p0, 0L, 4, 3)
+
+ // verify the remote retention policy is working.
+ // Use DELETE_RECORDS API to delete the records upto offset 1
and expect 1 remote segment to be deleted
+ .expectDeletionInRemoteStorage(broker0, topicA, p0,
DELETE_SEGMENT, 1)
+ .deleteRecords(topicA, p0, 1L)
+ .waitForRemoteLogSegmentDeletion(topicA)
+
+ // disabling remote log on topicA and enabling deleteOnDisable
+ .updateTopicConfig(topicA,
+ deleteOnDisable,
+ Collections.emptyList())
+ // make sure all remote data is deleted
+ .expectEmptyRemoteStorage(topicA, p0)
+ // verify the local log is still consumable
+ .consume(topicA, p0, 3L, 1, 0);
+ }
+}