ijuma commented on code in PR #13040: URL: https://github.com/apache/kafka/pull/13040#discussion_r1071365497
########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -2104,7 +2105,7 @@ object UnifiedLog extends Logging { // (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state // from the first segment. if (recordVersion.value < RecordBatch.MAGIC_VALUE_V2 || - (producerStateManager.latestSnapshotOffset.isEmpty && reloadFromCleanShutdown)) { + (producerStateManager.latestSnapshotOffset.asScala.isEmpty && reloadFromCleanShutdown)) { // To avoid an expensive scan through all of the segments, we take empty snapshots from the start of the Review Comment: Can we call !isPresent instead of asScala.isEmpty? ########## core/src/test/scala/unit/kafka/log/LogLoaderTest.scala: ########## @@ -557,7 +558,7 @@ class LogLoaderTest { _topicId = None, keepPartitionMetadataFile = true) - verify(stateManager).removeStraySnapshots(any[Seq[Long]]) + verify(stateManager).removeStraySnapshots((any[java.util.List[java.lang.Long]])) Review Comment: Are the extra parenthesis around `any` needed? We have a few similar examples in this file. ########## core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala: ########## @@ -340,8 +342,7 @@ class ProducerStateManagerTest { // After reloading from the snapshot, the transaction should still be considered late val reloadedStateManager = new ProducerStateManager(partition, logDir, maxTransactionTimeoutMs, producerStateManagerConfig, time) - reloadedStateManager.truncateAndReload(logStartOffset = 0L, - logEndOffset = stateManager.mapEndOffset, currentTimeMs = time.milliseconds()) + reloadedStateManager.truncateAndReload(0L,stateManager.mapEndOffset, time.milliseconds()) Review Comment: Nit: space missing after `,`. ########## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateManagerConfig.java: ########## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.Collections; +import java.util.Set; + +public class ProducerStateManagerConfig { + public static final Set<String> RECONFIGURABLE_CONFIGS = Collections.singleton("producer.id.expiration.ms"); + private volatile int producerIdExpirationMs; + + public ProducerStateManagerConfig(int producerIdExpirationMs) { + this.producerIdExpirationMs = producerIdExpirationMs; + } + + public void updateProducerIdExpirationMs(int producerIdExpirationMs) { Review Comment: Nit: use `set` instead of `update`? ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -680,20 +680,20 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock synchronized { - producerStateManager.activeProducers.map { case (producerId, producerIdEntry) => - (producerId, producerIdEntry.lastSeq) + producerStateManager.activeProducers.asScala.map { case (producerId, producerIdEntry) => + (producerId.toLong, producerIdEntry.lastSeq) } - } + }.toMap Review Comment: Can we avoid this `toMap` copy? Same for the case a few lines below. ########## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ########## @@ -652,7 +652,7 @@ class TransactionsTest extends IntegrationTestHarness { producer.commitTransaction() var producerStateEntry = - brokers(partitionLeader).logManager.getLog(new TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers.head._2 + brokers(partitionLeader).logManager.getLog(new TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers.asScala.head._2 Review Comment: Can we use `.get(0)` instead of `asScala.head`? There's one other similar example in this file. ########## core/src/test/scala/unit/kafka/log/LogTestUtils.scala: ########## @@ -247,7 +246,7 @@ object LogTestUtils { } def listProducerSnapshotOffsets(logDir: File): Seq[Long] = - ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted + ProducerStateManager.listSnapshotFiles(logDir).asScala.map(_.offset).sorted.toSeq Review Comment: Is the `toSeq` at the end required? I'd have thought `sorted` returns a `Seq`. ########## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ########## @@ -685,7 +685,7 @@ class TransactionsTest extends IntegrationTestHarness { // get here without having bumped the epoch. If bumping the epoch is possible, the producer will attempt to, so // check there that the epoch has actually increased producerStateEntry = - brokers(partitionLeader).logManager.getLog(new TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers(producerId) + brokers(partitionLeader).logManager.getLog(new TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers.get(producerId) Review Comment: One difference in behavior is that Java's `Map.get` returns `null` if no mapping is found while Scala's `Map.apply` throws an exception. There's another similar example in this file. ########## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateManagerConfig.java: ########## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.Collections; +import java.util.Set; + +public class ProducerStateManagerConfig { + public static final Set<String> RECONFIGURABLE_CONFIGS = Collections.singleton("producer.id.expiration.ms"); Review Comment: We should have a static final for this config name here and reference it from `KafkaConfig`. ########## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ########## @@ -215,11 +212,11 @@ class UnifiedLogTest { log.close() val reopened = createLog(logDir, logConfig) - assertEquals(Some(new LogOffsetMetadata(3L)), reopened.producerStateManager.firstUnstableOffset) + assertEquals(Optional.of(new LogOffsetMetadata(3L)), reopened.producerStateManager.firstUnstableOffset) truncateFunc(reopened)(0L) assertEquals(None, reopened.firstUnstableOffset) - assertEquals(Map.empty, reopened.producerStateManager.activeProducers) + assertTrue(reopened.producerStateManager.activeProducers.isEmpty) Review Comment: The previous approach is better as it includes the map contents in the message. ########## core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala: ########## @@ -88,7 +88,7 @@ class ProducerStateManagerTest { val producerEpoch = 2.toShort appendEndTxnMarker(stateManager, producerId, producerEpoch, ControlRecordType.COMMIT, offset = 27L) - val firstEntry = stateManager.lastEntry(producerId).getOrElse(throw new RuntimeException("Expected last entry to be defined")) + val firstEntry = stateManager.lastEntry(producerId).orElseThrow(() => throw new RuntimeException("Expected last entry to be defined")) Review Comment: You are supposed to return the exception, not throw it (when using `orElseThrow`). Please update similar places in this file. ########## core/src/main/scala/kafka/server/DynamicBrokerConfig.scala: ########## @@ -1031,3 +1031,20 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi listeners.map(e => (e.listenerName, e)).toMap } + +class DynamicProducerStateManagerConfig(val producerStateManagerConfig: ProducerStateManagerConfig) extends BrokerReconfigurable with Logging { + def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { + if (producerStateManagerConfig.producerIdExpirationMs() != newConfig.producerIdExpirationMs) { + info(s"Reconfigure ${KafkaConfig.ProducerIdExpirationMsProp} from ${producerStateManagerConfig.producerIdExpirationMs()} to ${newConfig.producerIdExpirationMs}") + producerStateManagerConfig.updateProducerIdExpirationMs(newConfig.producerIdExpirationMs) + } + } + + def validateReconfiguration(newConfig: KafkaConfig): Unit = { + if (newConfig.producerIdExpirationMs < 0) + throw new ConfigException(s"${KafkaConfig.ProducerIdExpirationMsProp} cannot be less than 0, current value is ${newConfig.producerIdExpirationMs}") Review Comment: In the old code, the second parameter is `producerStateManagerConfig.producerIdExpirationMs`. I think we should actually have both the current and new value. ########## core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala: ########## @@ -192,7 +192,8 @@ class ProducerStateManagerTest { // should be able to append with the new epoch if we start at sequence 0 append(stateManager, producerId, bumpedProducerEpoch, 0, 2L) - assertEquals(Some(0), stateManager.lastEntry(producerId).map(_.firstSeq)) + val value: Optional[Int] = stateManager.lastEntry(producerId).map(_.firstSeq) Review Comment: Do we need to extract the val here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org