kamalcph commented on code in PR #14243: URL: https://github.com/apache/kafka/pull/14243#discussion_r1323939601
########## core/src/main/scala/kafka/server/BrokerServer.scala: ########## @@ -700,4 +700,5 @@ class BrokerServer( override def boundPort(listenerName: ListenerName): Int = socketServer.boundPort(listenerName) + override def remoteLogManager: Option[RemoteLogManager] = remoteLogManagerOpt Review Comment: Why do we need this method? trait `KafkaBroker` already have [remoteLogManagerOpt](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaBroker.scala#L84) method. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ########## @@ -112,27 +115,54 @@ public class RemoteIndexCache implements Closeable { * * We use {@link Caffeine} cache instead of implementing a thread safe LRU cache on our own. */ - private final Cache<Uuid, Entry> internalCache; - private final RemoteStorageManager remoteStorageManager; - private final ShutdownableThread cleanerThread; + private Cache<Uuid, Entry> internalCache; public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String logDir) throws IOException { Review Comment: Can we remove this constructor? (it's used only by the test) ########## core/src/main/scala/kafka/server/KafkaServer.scala: ########## @@ -129,7 +129,7 @@ class KafkaServer( var logDirFailureChannel: LogDirFailureChannel = _ @volatile private var _logManager: LogManager = _ - var remoteLogManagerOpt: Option[RemoteLogManager] = None + @volatile var remoteLogManagerOpt: Option[RemoteLogManager] = None Review Comment: Why do we have to mark this as `volatile`? ########## core/src/main/scala/kafka/server/DynamicBrokerConfig.scala: ########## @@ -89,7 +90,8 @@ object DynamicBrokerConfig { Set(KafkaConfig.MetricReporterClassesProp) ++ DynamicListenerConfig.ReconfigurableConfigs ++ SocketServer.ReconfigurableConfigs ++ - ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala + ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala ++ + DynamicRemoteLogConfigs Review Comment: Can we remove the static import and make the usage uniform for readability? ``` DynamicRemoteLogConfig.ReconfigurableConfigs ``` ########## core/src/main/scala/kafka/server/DynamicBrokerConfig.scala: ########## @@ -1129,3 +1132,50 @@ class DynamicProducerStateManagerConfig(val producerStateManagerConfig: Producer override def reconfigurableConfigs: Set[String] = ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala } + +class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable with Logging { + override def reconfigurableConfigs: Set[String] = { + DynamicRemoteLogConfigs + } + + override def validateReconfiguration(newConfig: KafkaConfig): Unit = { + newConfig.values.forEach { (k, v) => + if (DynamicRemoteLogConfigs.contains(k)) { + val newValue = v.asInstanceOf[Long] + val oldValue = getValue(server.config, k) + if (newValue != oldValue) { + val errorMsg = s"Dynamic remote log manager config update validation failed for $k=$v" + if (newValue <= 0) + throw new ConfigException(s"$errorMsg, value should be at least 1") + } + } + } + } + + override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { + val oldValue = oldConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) + val newValue = newConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) + if (oldValue != newValue) { + val remoteLogManager = server.remoteLogManager + if (remoteLogManager.nonEmpty) { + remoteLogManager.get.resizeCacheSize(newValue) + info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP} updated, " + + s"old value: $oldValue, new value: $newValue") + } + } + } + + private def getValue(config: KafkaConfig, name: String): Long = { + name match { + case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP => + config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) + case n => throw new IllegalStateException(s"Unexpected dynamic remote log manager config $n") + } + } +} + +object DynamicRemoteLogConfig { + val DynamicRemoteLogConfigs = Set( Review Comment: Can we follow uniform naming strategy for readability? Rename `DynamicRemoteLogConfigs` -> `ReconfigurableConfigs` ########## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ########## @@ -230,7 +231,7 @@ class RemoteIndexCacheTest { // close existing cache created in test setup before creating a new one Utils.closeQuietly(cache, "RemoteIndexCache created for unit test") - cache = new RemoteIndexCache(2, rsm, tpDir.toString) + cache = new RemoteIndexCache(2L, rsm, tpDir.toString) Review Comment: Why this is `2L` but not `2 * estimateEntryBytesSize`? ########## storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java: ########## @@ -197,7 +197,7 @@ public final class RemoteLogManagerConfig { atLeast(0), LOW, REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC) - .defineInternal(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, + .define(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, LONG, DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES, Review Comment: `DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES` is still referring to 1 GB. What is the difference between `DEFAULT_REMOTE_INDEX_CACHE_SIZE_BYTES` and `DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES` configs? ########## core/src/main/scala/kafka/server/KafkaBroker.scala: ########## @@ -95,6 +95,7 @@ trait KafkaBroker extends Logging { def brokerTopicStats: BrokerTopicStats def credentialProvider: CredentialProvider def clientToControllerChannelManager: NodeToControllerChannelManager + def remoteLogManager: Option[RemoteLogManager] Review Comment: this is not required, please reuse `remoteLogManagerOpt` method. ########## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ########## @@ -524,6 +526,50 @@ class RemoteIndexCacheTest { } } + @Test + def testClearCacheAndIndexFilesWhenResizeCache(): Unit = { + + def getIndexFileFromRemoteCacheDir(suffix: String) = { + Files.walk(cache.cacheDir()) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } + + Utils.closeQuietly(cache, "RemoteIndexCache created for unit test") + cache = new RemoteIndexCache(rsm, tpDir.toString) Review Comment: Why do we reinitialise the cache? ########## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ########## @@ -594,4 +575,16 @@ class RemoteIndexCacheTest { timeIndex.flush() } } + + private def getEstimateEntryBytesSize(): Long = { + val cacheForEstimate = new RemoteIndexCache(2L, rsm, tpDir.toString) + val tpIdForEstimate = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) + val metadataListForEstimate = generateRemoteLogSegmentMetadata(size = 1, tpIdForEstimate) + val entryForEstimate = cacheForEstimate.getIndexEntry(metadataListForEstimate.head) + val estimateEntryBytesSize = entryForEstimate.entrySizeBytes() + Utils.closeQuietly(cacheForEstimate, "RemoteIndexCache created for estimating entry size") + cleanup() + setup() Review Comment: My preference is to have a different partition directory to avoid using the `setUp` and `cleanup` methods which are common to all the tests inside the test: ``` scala private def estimateOneEntryBytesSize(): Long = { val tp = new TopicPartition("estimate-entry-bytes-size", 0) val tpId = new TopicIdPartition(Uuid.randomUuid(), tp) val tpDir = new File(logDir, tpId.toString) Files.createDirectory(tpDir.toPath) val rsm = mock(classOf[RemoteStorageManager]) mockRsmFetchIndex(rsm) val cache = new RemoteIndexCache(2L, rsm, tpDir.toString) val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId) val entry = cache.getIndexEntry(metadataList.head) val estimateEntryBytesSize = entry.entrySizeBytes() Utils.closeQuietly(cache, "RemoteIndexCache created for estimating entry size") estimateEntryBytesSize } // We can reuse this method in the setup() private def mockRsmFetchIndex(rsm: RemoteStorageManager): Unit = { when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType]))) .thenAnswer(ans => { val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) val indexType = ans.getArgument[IndexType](1) val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) val timeIdx = createTimeIndexForSegmentMetadata(metadata) val txnIdx = createTxIndexForSegmentMetadata(metadata) maybeAppendIndexEntries(offsetIdx, timeIdx) indexType match { case IndexType.OFFSET => new FileInputStream(offsetIdx.file) case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file) case IndexType.TRANSACTION => new FileInputStream(txnIdx.file) case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed. case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed. } }) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
