This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 7fe3cec4eb2 KAFKA-17236: Handle local log deletion when
remote.log.copy.disabled=true (#16765)
7fe3cec4eb2 is described below
commit 7fe3cec4eb20ac60291e437a3dd9fa46f9fb2afc
Author: Luke Chen <[email protected]>
AuthorDate: Thu Aug 8 20:37:40 2024 +0900
KAFKA-17236: Handle local log deletion when remote.log.copy.disabled=true
(#16765)
Handle local log deletion when remote.log.copy.disabled=true based on the
KIP-950.
When tiered storage is disabled or becomes read-only on a topic, the local
retention configuration becomes irrelevant, and all data expiration follows the
topic-wide retention configuration exclusively.
- added remoteLogEnabledAndRemoteCopyEnabled method to check if this topic
enables tiered storage and remote log copy is enabled. We should adopt
local.retention.ms/bytes when
remote.storage.enable=true,remote.log.copy.disable=false.
- Changed to use retention.bytes/retention.ms when remote copy disabled.
- Added validation to ask users to set local.retention.ms == retention.ms
and local.retention.bytes == retention.bytes
- Added tests
Reviewers: Kamal Chandraprakash<[email protected]>, Satish
Duggana <[email protected]>, Christo Lolov <[email protected]>
---
.../apache/kafka/common/config/TopicConfig.java | 4 +-
core/src/main/scala/kafka/log/LogManager.scala | 1 +
core/src/main/scala/kafka/log/UnifiedLog.scala | 24 ++--
.../kafka/admin/RemoteTopicCrudTest.scala | 132 +++++++++++++++++++++
.../test/scala/unit/kafka/log/LogTestUtils.scala | 7 +-
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 82 +++++++++++++
.../kafka/storage/internals/log/LogConfig.java | 21 ++++
.../integration/DisableRemoteLogOnTopicTest.java | 20 +++-
8 files changed, 275 insertions(+), 16 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index 04c6c487cd0..b3cea19c551 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -95,7 +95,9 @@ public class TopicConfig {
public static final String REMOTE_LOG_COPY_DISABLE_CONFIG =
"remote.log.copy.disable";
public static final String REMOTE_LOG_COPY_DISABLE_DOC = "Determines
whether tiered data for a topic should become read only," +
- " and no more data uploading on a topic.";
+ " and no more data uploading on a topic. Once this config is set
to true, the local retention configuration " +
+ "(i.e. local.retention.ms/bytes) becomes irrelevant, and all data
expiration follows the topic-wide retention configuration" +
+ "(i.e. retention.ms/bytes).";
public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG =
"remote.log.delete.on.disable";
public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines
whether tiered data for a topic should be " +
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index 66fedbfae6d..7c04dee1f78 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -981,6 +981,7 @@ class LogManager(logDirs: Seq[File],
LogConfig.validateNoInvalidRemoteStorageConfigsInZK(newLogConfig.values())
}
LogConfig.validateTurningOffRemoteStorageWithDelete(newLogConfig.values(),
wasRemoteLogEnabled, isRemoteLogStorageEnabled)
+
LogConfig.validateRetentionConfigsWhenRemoteCopyDisabled(newLogConfig.values(),
isRemoteLogStorageEnabled)
if (logs.nonEmpty) {
logs.foreach { log =>
val oldLogConfig = log.updateConfig(newLogConfig)
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index bef18806b0d..69b6787ca63 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -1463,6 +1463,13 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
}
+ /**
+ * @return true if this topic enables tiered storage and remote log copy is
enabled (i.e. remote.log.copy.disable=false)
+ */
+ private def remoteLogEnabledAndRemoteCopyEnabled(): Boolean = {
+ remoteLogEnabled() && !config.remoteLogCopyDisable()
+ }
+
/**
* Find segments starting from the oldest until the user-supplied predicate
is false.
* A final segment that is empty will never be returned.
@@ -1477,7 +1484,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// Segments are eligible for deletion when:
// 1. they are uploaded to the remote storage
// 2. log-start-offset was incremented higher than the largest offset
in the candidate segment
- if (remoteLogEnabled()) {
+ // Note: when remote log copy is disabled, we will fall back to local
log check using retention.ms/bytes
+ if (remoteLogEnabledAndRemoteCopyEnabled()) {
(upperBoundOffset > 0 && upperBoundOffset - 1 <=
highestOffsetInRemoteStorage()) ||
allowDeletionDueToLogStartOffsetIncremented
} else {
@@ -1522,7 +1530,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
private def incrementStartOffset(startOffset: Long, reason:
LogStartOffsetIncrementReason): Unit = {
- if (remoteLogEnabled()) maybeIncrementLocalLogStartOffset(startOffset,
reason)
+ if (remoteLogEnabledAndRemoteCopyEnabled())
maybeIncrementLocalLogStartOffset(startOffset, reason)
else maybeIncrementLogStartOffset(startOffset, reason)
}
@@ -1570,7 +1578,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
private def deleteRetentionMsBreachedSegments(): Int = {
- val retentionMs = localRetentionMs(config, remoteLogEnabled())
+ val retentionMs = localRetentionMs(config,
remoteLogEnabledAndRemoteCopyEnabled())
if (retentionMs < 0) return 0
val startMs = time.milliseconds
@@ -1582,7 +1590,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
private def deleteRetentionSizeBreachedSegments(): Int = {
- val retentionSize: Long = localRetentionSize(config, remoteLogEnabled())
+ val retentionSize: Long = localRetentionSize(config,
remoteLogEnabledAndRemoteCopyEnabled())
if (retentionSize < 0 || size < retentionSize) return 0
var diff = size - retentionSize
def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]):
Boolean = {
@@ -2338,12 +2346,12 @@ object UnifiedLog extends Logging {
}
}
- private[log] def localRetentionMs(config: LogConfig, remoteLogEnabled:
Boolean): Long = {
- if (remoteLogEnabled) config.remoteLogConfig.localRetentionMs else
config.retentionMs
+ private[log] def localRetentionMs(config: LogConfig,
remoteLogEnabledAndRemoteCopyEnabled: Boolean): Long = {
+ if (remoteLogEnabledAndRemoteCopyEnabled)
config.remoteLogConfig.localRetentionMs else config.retentionMs
}
- private[log] def localRetentionSize(config: LogConfig, remoteLogEnabled:
Boolean): Long = {
- if (remoteLogEnabled) config.remoteLogConfig.localRetentionBytes else
config.retentionSize
+ private[log] def localRetentionSize(config: LogConfig,
remoteLogEnabledAndRemoteCopyEnabled: Boolean): Long = {
+ if (remoteLogEnabledAndRemoteCopyEnabled)
config.remoteLogConfig.localRetentionBytes else config.retentionSize
}
}
diff --git
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index c88962cb845..2983ba0e307 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -168,6 +168,138 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
verifyRemoteLogTopicConfigs(topicConfig)
}
+ // `remote.log.delete.on.disable` only works in KRaft mode.
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testCreateTopicRetentionMsValidationWithRemoteCopyDisabled(quorum:
String): Unit = {
+ val testTopicName2 = testTopicName + "2"
+ val testTopicName3 = testTopicName + "3"
+ val errorMsgMs = "When `remote.log.copy.disable` is set to true, the
`local.retention.ms` and `retention.ms` " +
+ "must be set to the identical value because there will be no more logs
copied to the remote storage."
+
+ // 1. create a topic with `remote.log.copy.disable=true` and have
different local.retention.ms and retention.ms value,
+ // it should fail to create the topic
+ val topicConfig = new Properties()
+ topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+ topicConfig.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true")
+ topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
+ topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1000")
+ topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
+
+ val admin = createAdminClient()
+ val err = assertThrowsException(classOf[InvalidConfigurationException],
+ () => TestUtils.createTopicWithAdmin(admin, testTopicName,
brokers, controllerServers, numPartitions,
+ numReplicationFactor, topicConfig = topicConfig))
+ assertEquals(errorMsgMs, err.getMessage)
+
+ // 2. change the local.retention.ms value to the same value as
retention.ms should successfully create the topic
+ topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "1000")
+ TestUtils.createTopicWithAdmin(admin, testTopicName, brokers,
controllerServers, numPartitions, numReplicationFactor,
+ topicConfig = topicConfig)
+
+ // 3. change the local.retention.ms value to "-2" should also successfully
create the topic
+ topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
+ TestUtils.createTopicWithAdmin(admin, testTopicName2, brokers,
controllerServers, numPartitions, numReplicationFactor,
+ topicConfig = topicConfig)
+
+ // 4. create a topic with `remote.log.copy.disable=false` and have
different local.retention.ms and retention.ms value,
+ // it should successfully creates the topic.
+ topicConfig.clear()
+ topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+ topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
+ topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1000")
+ topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
+ TestUtils.createTopicWithAdmin(admin, testTopicName3, brokers,
controllerServers, numPartitions, numReplicationFactor,
+ topicConfig = topicConfig)
+
+ // 5. alter the config to `remote.log.copy.disable=true`, it should fail
the config change
+ val configs = new util.HashMap[ConfigResource,
util.Collection[AlterConfigOp]]()
+ configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3),
+ util.Arrays.asList(
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
+ AlterConfigOp.OpType.SET),
+ ))
+ val err2 = assertThrowsException(classOf[InvalidConfigurationException],
+ () => admin.incrementalAlterConfigs(configs).all().get())
+ assertEquals(errorMsgMs, err2.getMessage)
+
+ // 6. alter the config to `remote.log.copy.disable=true` and
local.retention.ms == retention.ms, it should work without error
+ configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3),
+ util.Arrays.asList(
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
+ AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "1000"),
+ AlterConfigOp.OpType.SET),
+ ))
+
+ admin.incrementalAlterConfigs(configs).all().get()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testCreateTopicRetentionBytesValidationWithRemoteCopyDisabled(quorum:
String): Unit = {
+ val testTopicName2 = testTopicName + "2"
+ val testTopicName3 = testTopicName + "3"
+ val errorMsgBytes = "When `remote.log.copy.disable` is set to true, the
`local.retention.bytes` and `retention.bytes` " +
+ "must be set to the identical value because there will be no more logs
copied to the remote storage."
+
+ // 1. create a topic with `remote.log.copy.disable=true` and have
different local.retention.bytes and retention.bytes value,
+ // it should fail to create the topic
+ val topicConfig = new Properties()
+ topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+ topicConfig.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true")
+ topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "100")
+ topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1000")
+ topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
+
+ val admin = createAdminClient()
+ val err = assertThrowsException(classOf[InvalidConfigurationException],
+ () => TestUtils.createTopicWithAdmin(admin, testTopicName, brokers,
controllerServers, numPartitions,
+ numReplicationFactor, topicConfig = topicConfig))
+ assertEquals(errorMsgBytes, err.getMessage)
+
+ // 2. change the local.retention.bytes value to the same value as
retention.bytes should successfully create the topic
+ topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1000")
+ TestUtils.createTopicWithAdmin(admin, testTopicName, brokers,
controllerServers, numPartitions, numReplicationFactor,
+ topicConfig = topicConfig)
+
+ // 3. change the local.retention.bytes value to "-2" should also
successfully create the topic
+ topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
+ TestUtils.createTopicWithAdmin(admin, testTopicName2, brokers,
controllerServers, numPartitions, numReplicationFactor,
+ topicConfig = topicConfig)
+
+ // 4. create a topic with `remote.log.copy.disable=false` and have
different local.retention.bytes and retention.bytes value,
+ // it should successfully creates the topic.
+ topicConfig.clear()
+ topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+ topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "100")
+ topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1000")
+ topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
+ TestUtils.createTopicWithAdmin(admin, testTopicName3, brokers,
controllerServers, numPartitions, numReplicationFactor,
+ topicConfig = topicConfig)
+
+ // 5. alter the config to `remote.log.copy.disable=true`, it should fail
the config change
+ val configs = new util.HashMap[ConfigResource,
util.Collection[AlterConfigOp]]()
+ configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3),
+ util.Arrays.asList(
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
+ AlterConfigOp.OpType.SET),
+ ))
+ val err2 = assertThrowsException(classOf[InvalidConfigurationException],
+ () => admin.incrementalAlterConfigs(configs).all().get())
+ assertEquals(errorMsgBytes, err2.getMessage)
+
+ // 6. alter the config to `remote.log.copy.disable=true` and
local.retention.bytes == retention.bytes, it should work without error
+ configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3),
+ util.Arrays.asList(
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
+ AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1000"),
+ AlterConfigOp.OpType.SET),
+ ))
+ admin.incrementalAlterConfigs(configs).all().get()
+ }
+
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testEnableRemoteLogOnExistingTopicTest(quorum: String): Unit = {
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 706a5ec30f2..264dfb36cfe 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -36,6 +36,7 @@ import
org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
+import
org.apache.kafka.storage.internals.log.LogConfig.{DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG,
DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
FetchDataInfo, FetchIsolation, LazyIndex, LogAppendInfo, LogConfig,
LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment,
ProducerStateManager, ProducerStateManagerConfig, TransactionIndex}
import scala.jdk.CollectionConverters._
@@ -68,7 +69,9 @@ object LogTestUtils {
indexIntervalBytes: Int =
ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_DEFAULT,
segmentIndexBytes: Int =
ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT,
fileDeleteDelayMs: Long =
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
- remoteLogStorageEnable: Boolean =
LogConfig.DEFAULT_REMOTE_STORAGE_ENABLE): LogConfig = {
+ remoteLogStorageEnable: Boolean =
LogConfig.DEFAULT_REMOTE_STORAGE_ENABLE,
+ remoteLogCopyDisable: Boolean =
DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG,
+ remoteLogDeleteOnDisable: Boolean =
DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG): LogConfig = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_MS_CONFIG, segmentMs: java.lang.Long)
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentBytes: Integer)
@@ -83,6 +86,8 @@ object LogTestUtils {
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, segmentIndexBytes:
Integer)
logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, fileDeleteDelayMs:
java.lang.Long)
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
remoteLogStorageEnable: java.lang.Boolean)
+ logProps.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
remoteLogCopyDisable: java.lang.Boolean)
+ logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
remoteLogDeleteOnDisable: java.lang.Boolean)
new LogConfig(logProps)
}
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index c18371854fe..b45c9171d66 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -4187,6 +4187,88 @@ class UnifiedLogTest {
assertEquals(1, log.logSegments.size)
}
+ @Test
+ def testRetentionOnLocalLogDeletionWhenRemoteLogCopyDisabled(): Unit = {
+ def createRecords = TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
+ val segmentBytes = createRecords.sizeInBytes()
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes,
localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
+ fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
+ val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+
+ // Given 6 segments of 1 message each
+ for (_ <- 0 until 6) {
+ log.appendAsLeader(createRecords, leaderEpoch = 0)
+ }
+ assertEquals(6, log.logSegments.size)
+
+ log.updateHighWatermark(log.logEndOffset)
+
+ // Should not delete local log because highest remote storage offset is -1
(default value)
+ log.deleteOldSegments()
+ assertEquals(6, log.logSegments.size())
+ assertEquals(0, log.logStartOffset)
+ assertEquals(0, log.localLogStartOffset())
+
+ // simulate calls to upload 2 segments to remote storage
+ log.updateHighestOffsetInRemoteStorage(1)
+
+ log.deleteOldSegments()
+ assertEquals(4, log.logSegments.size())
+ assertEquals(0, log.logStartOffset)
+ assertEquals(2, log.localLogStartOffset())
+
+ // add remoteCopyDisabled = true
+ val copyDisabledLogConfig = LogTestUtils.createLogConfig(segmentBytes =
segmentBytes, localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
+ fileDeleteDelayMs = 0, remoteLogStorageEnable = true,
remoteLogCopyDisable = true)
+ log.updateConfig(copyDisabledLogConfig)
+
+ // No local logs will be deleted even though local retention bytes is 1
because we'll adopt retention.ms/bytes
+ // when remote.log.copy.disable = true
+ log.deleteOldSegments()
+ assertEquals(4, log.logSegments.size())
+ assertEquals(0, log.logStartOffset)
+ assertEquals(2, log.localLogStartOffset())
+
+ // simulate the remote logs are all deleted due to retention policy
+ log.updateLogStartOffsetFromRemoteTier(2)
+ assertEquals(4, log.logSegments.size())
+ assertEquals(2, log.logStartOffset)
+ assertEquals(2, log.localLogStartOffset())
+
+ // produce 3 more segments
+ for (_ <- 0 until 3) {
+ log.appendAsLeader(createRecords, leaderEpoch = 0)
+ }
+ assertEquals(7, log.logSegments.size)
+ log.updateHighWatermark(log.logEndOffset)
+
+ // try to delete local logs again, 2 segments will be deleted this time
because we'll adopt retention.ms/bytes (retention.bytes = 5)
+ // when remote.log.copy.disable = true
+ log.deleteOldSegments()
+ assertEquals(5, log.logSegments.size())
+ assertEquals(4, log.logStartOffset)
+ assertEquals(4, log.localLogStartOffset())
+
+ // add localRetentionMs = 1, retentionMs = 1000
+ val retentionMsConfig = LogTestUtils.createLogConfig(segmentBytes =
segmentBytes, localRetentionMs = 1, retentionMs = 1000,
+ fileDeleteDelayMs = 0, remoteLogStorageEnable = true,
remoteLogCopyDisable = true)
+ log.updateConfig(retentionMsConfig)
+
+ // Should not delete any logs because no local logs expired using
retention.ms = 1000
+ mockTime.sleep(10)
+ log.deleteOldSegments()
+ assertEquals(5, log.logSegments.size())
+ assertEquals(4, log.logStartOffset)
+ assertEquals(4, log.localLogStartOffset())
+
+ // Should delete all logs because all of them are expired based on
retentionMs = 1000
+ mockTime.sleep(1000)
+ log.deleteOldSegments()
+ assertEquals(1, log.logSegments.size())
+ assertEquals(9, log.logStartOffset)
+ assertEquals(9, log.localLogStartOffset())
+ }
+
@Test
def testIncrementLocalLogStartOffsetAfterLocalLogDeletion(): Unit = {
val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1,
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index 8d6df1ee93e..f3f151ddc18 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -184,6 +184,8 @@ public class LogConfig extends AbstractConfig {
public static final long DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS =
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DEFAULT;
public static final boolean DEFAULT_REMOTE_STORAGE_ENABLE = false;
+ public static final boolean DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG = false;
+ public static final boolean DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG =
false;
public static final long DEFAULT_LOCAL_RETENTION_BYTES = -2; // It
indicates the value to be derived from RetentionBytes
public static final long DEFAULT_LOCAL_RETENTION_MS = -2; // It indicates
the value to be derived from RetentionMs
@@ -635,6 +637,7 @@ public class LogConfig extends AbstractConfig {
validateNoRemoteStorageForCompactedTopic(newConfigs);
validateRemoteStorageRetentionSize(newConfigs);
validateRemoteStorageRetentionTime(newConfigs);
+ validateRetentionConfigsWhenRemoteCopyDisabled(newConfigs,
isRemoteLogStorageEnabled);
} else {
// The new config "remote.storage.enable" is false, validate if
it's turning from true to false
boolean wasRemoteLogEnabled =
Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"false"));
@@ -651,6 +654,24 @@ public class LogConfig extends AbstractConfig {
}
}
+ public static void validateRetentionConfigsWhenRemoteCopyDisabled(Map<?,
?> newConfigs, boolean isRemoteLogStorageEnabled) {
+ boolean isRemoteLogCopyDisabled = (Boolean)
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
false);
+ long retentionMs = (Long)
newConfigs.get(TopicConfig.RETENTION_MS_CONFIG);
+ long localRetentionMs = (Long)
newConfigs.get(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG);
+ long retentionBytes = (Long)
newConfigs.get(TopicConfig.RETENTION_BYTES_CONFIG);
+ long localRetentionBytes = (Long)
newConfigs.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
+ if (isRemoteLogStorageEnabled && isRemoteLogCopyDisabled) {
+ if (localRetentionBytes != -2 && localRetentionBytes !=
retentionBytes) {
+ throw new InvalidConfigurationException("When
`remote.log.copy.disable` is set to true, the `local.retention.bytes` " +
+ "and `retention.bytes` must be set to the identical
value because there will be no more logs copied to the remote storage.");
+ }
+ if (localRetentionMs != -2 && localRetentionMs != retentionMs) {
+ throw new InvalidConfigurationException("When
`remote.log.copy.disable` is set to true, the `local.retention.ms` " +
+ "and `retention.ms` must be set to the identical value
because there will be no more logs copied to the remote storage.");
+ }
+ }
+ }
+
public static void validateNoInvalidRemoteStorageConfigsInZK(Map<?, ?>
newConfigs) {
boolean isRemoteLogDeleteOnDisable = (Boolean)
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
false);
boolean isRemoteLogCopyDisabled = (Boolean)
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
false);
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java
index 6bcaea6a363..2655f600b3b 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java
@@ -60,8 +60,16 @@ public final class DisableRemoteLogOnTopicTest extends
TieredStorageTestHarness
final Map<Integer, List<Integer>> assignment = mkMap(
mkEntry(p0, Arrays.asList(broker0, broker1))
);
- final Map<String, String> disableCopy = new HashMap<>();
- disableCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true");
+ // local.retention.ms/bytes need to set to the same value as
retention.ms/bytes when disabling remote log copy
+ final Map<String, String> disableRemoteCopy = new HashMap<>();
+ disableRemoteCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
"true");
+ disableRemoteCopy.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG,
"-2");
+ disableRemoteCopy.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2");
+
+ // revert the change to local.retention.bytes
+ final Map<String, String> enableRemoteCopy = new HashMap<>();
+ enableRemoteCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
"false");
+ enableRemoteCopy.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG,
"1");
final Map<String, String> deleteOnDisable = new HashMap<>();
deleteOnDisable.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"false");
@@ -78,7 +86,7 @@ public final class DisableRemoteLogOnTopicTest extends
TieredStorageTestHarness
new KeyValueSpec("k2", "v2"))
// disable remote log copy
.updateTopicConfig(topicA,
-
Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
+ disableRemoteCopy,
Collections.emptyList())
// make sure we can still consume from the beginning of the
topic to read data from local and remote storage
@@ -87,16 +95,16 @@ public final class DisableRemoteLogOnTopicTest extends
TieredStorageTestHarness
// re-enable remote log copy
.updateTopicConfig(topicA,
-
Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "false"),
+ enableRemoteCopy,
Collections.emptyList())
// make sure the logs can be offloaded
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
.produce(topicA, p0, new KeyValueSpec("k3", "v3"))
- // explicitly disable remote log copy
+ // disable remote log copy again
.updateTopicConfig(topicA,
- disableCopy,
+ disableRemoteCopy,
Collections.emptyList())
// make sure we can still consume from the beginning of the
topic to read data from local and remote storage
.expectFetchFromTieredStorage(broker0, topicA, p0, 3)