showuon commented on code in PR #14114:
URL: https://github.com/apache/kafka/pull/14114#discussion_r1278885944


##########
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##########
@@ -713,6 +714,99 @@ class DynamicBrokerConfigTest {
     config.updateCurrentConfig(new KafkaConfig(props))
     
assertFalse(config.nonInternalValues.containsKey(KafkaConfig.MetadataLogSegmentMinBytesProp))
   }
+
+  @Test
+  def testDynamicLogLocalRetentionMsConfig(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    props.put(KafkaConfig.LogRetentionTimeMillisProp, "2592000000")
+    val config = KafkaConfig(props)
+    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[KafkaServer]))
+    config.dynamicConfig.initialize(None)
+    config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
+
+    val newProps = new Properties()
+    newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, 
"2160000000")
+    // update default config
+    config.dynamicConfig.validate(newProps, perBrokerConfig = false)
+    config.dynamicConfig.updateDefaultConfig(newProps)
+    assertEquals(2160000000L, config.logLocalRetentionMs)
+
+    // update per broker config
+    config.dynamicConfig.validate(newProps, perBrokerConfig = true)
+    newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, 
"2150000000")
+    config.dynamicConfig.updateBrokerConfig(0, newProps)
+    assertEquals(2150000000L, config.logLocalRetentionMs)
+  }
+
+  @Test
+  def testDynamicLogLocalRetentionSizeConfig(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    props.put(KafkaConfig.LogRetentionBytesProp, "4294967296")
+    val config = KafkaConfig(props)
+    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[KafkaServer]))
+    config.dynamicConfig.initialize(None)
+    config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
+
+    val newProps = new Properties()
+    newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, 
"4294967295")
+    // update default config
+    config.dynamicConfig.validate(newProps, perBrokerConfig = false)
+    config.dynamicConfig.updateDefaultConfig(newProps)
+    assertEquals(4294967295L, config.logLocalRetentionBytes)
+
+    // update per broker config
+    config.dynamicConfig.validate(newProps, perBrokerConfig = true)
+    newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, 
"4294967294")
+    config.dynamicConfig.updateBrokerConfig(0, newProps)
+    assertEquals(4294967294L, config.logLocalRetentionBytes)
+  }
+
+  @Test
+  def testDynamicLogLocalRetentionSkipsOnInvalidConfig(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "1000")
+    props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "1024")
+    val config = KafkaConfig(props)
+    config.dynamicConfig.initialize(None)
+
+    // Check for invalid localRetentionMs < -2
+    verifyConfigUpdateWithInvalidConfig(config, props, Map.empty, 
Map(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP -> "-3"))
+    // Check for invalid localRetentionBytes < -2
+    verifyConfigUpdateWithInvalidConfig(config, props, Map.empty, 
Map(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP -> "-3"))
+  }
+
+  @Test
+  def testDynamicLogLocalRetentionThrowsOnIncorrectConfig(): Unit = {
+    // Check for incorrect case of logLocalRetentionMs > retentionMs
+    verifyIncorrectLogLocalRetentionProps(2000L, 2, 100, 1000L)
+    // Check for incorrect case of logLocalRetentionBytes > retentionBytes
+    verifyIncorrectLogLocalRetentionProps(500L, 200, 100, 1000L)
+    // Check for incorrect case of logLocalRetentionMs (-1 viz unlimited) > 
retentionMs,
+    verifyIncorrectLogLocalRetentionProps(-1, 200, 100, 1000L)
+    // Check for incorrect case of logLocalRetentionBytes(-1 viz unlimited) > 
retentionBytes
+    verifyIncorrectLogLocalRetentionProps(2000L, -1, 100, 1000L)
+  }
+
+  def verifyIncorrectLogLocalRetentionProps(logLocalRetentionMs: Long,
+                                            logLocalRetentionBytes: Long,
+                                            retentionBytes: Long,
+                                            retentionMs: Long): Unit = {

Review Comment:
   nit: It's hard to review this validation method with current parameter 
order. Could you put the same type of config in pair? ex:
   ```
    def verifyIncorrectLogLocalRetentionProps(logLocalRetentionMs: Long,  
retentionMs: Long,  logLocalRetentionBytes: Long, retentionBytes: Long) 
   ```



##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1784,4 +1788,22 @@ class KafkaConfigTest {
     props.put(KafkaConfig.ConsumerGroupHeartbeatIntervalMsProp, "25")
     assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props))
   }
+
+  @Test
+  def testMultipleLogDirectoriesNotSupportedWithRemoteLogStorage(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
String.valueOf(true))
+    props.put(KafkaConfig.LogDirsProp, "/tmp/a,/tmp/b")
+
+    val caught = assertThrows(classOf[ConfigException], () => 
KafkaConfig.fromProps(props))
+    assertTrue(caught.getMessage.contains("Multiple log directories 
`/tmp/a,/tmp/b` are not supported when remote log storage is enabled"))
+  }
+
+  @Test
+  def testSingleLogDirectoryWithRemoteLogStorage(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
String.valueOf(true))
+    props.put(KafkaConfig.LogDirsProp, "/tmp/a")
+    KafkaConfig.fromProps(props)

Review Comment:
   nit: `assertDoesNotThrow(() ->  KafkaConfig.fromProps(props));`



##########
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##########
@@ -215,7 +227,19 @@ public final class RemoteLogManagerConfig {
                                   DEFAULT_REMOTE_LOG_READER_MAX_PENDING_TASKS,
                                   atLeast(1),
                                   MEDIUM,
-                                  REMOTE_LOG_READER_MAX_PENDING_TASKS_DOC);
+                                  REMOTE_LOG_READER_MAX_PENDING_TASKS_DOC)
+                  .defineInternal(LOG_LOCAL_RETENTION_MS_PROP,
+                          LONG,
+                          DEFAULT_LOG_LOCAL_RETENTION_MS,
+                          atLeast(DEFAULT_LOG_LOCAL_RETENTION_MS),
+                          MEDIUM,
+                          LOG_LOCAL_RETENTION_MS_DOC)
+                  .defineInternal(LOG_LOCAL_RETENTION_BYTES_PROP,
+                          LONG,
+                          DEFAULT_LOG_LOCAL_RETENTION_BYTES,

Review Comment:
   Could we try to align with above lines?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -424,6 +387,18 @@ public int initFileSize() {
             return 0;
     }
 
+    public boolean remoteStorageEnable() {
+        return remoteLogConfig.remoteStorageEnable;
+    }
+
+    public long localRetentionMs() {
+        return remoteLogConfig.localRetentionMs == 
LogConfig.DEFAULT_LOCAL_RETENTION_MS ? retentionMs : 
remoteLogConfig.localRetentionMs;
+    }
+
+    public long localRetentionBytes() {
+        return remoteLogConfig.localRetentionBytes == 
LogConfig.DEFAULT_LOCAL_RETENTION_BYTES ? retentionSize : 
remoteLogConfig.localRetentionBytes;

Review Comment:
   When will we use them? I think they should be called when retrieved the 
value since we don't expect to get value of `-2`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to