ijuma commented on code in PR #13040:
URL: https://github.com/apache/kafka/pull/13040#discussion_r1071365497


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -2104,7 +2105,7 @@ object UnifiedLog extends Logging {
     // (or later snapshots). Otherwise, if there is no snapshot file, then we 
have to rebuild producer state
     // from the first segment.
     if (recordVersion.value < RecordBatch.MAGIC_VALUE_V2 ||
-      (producerStateManager.latestSnapshotOffset.isEmpty && 
reloadFromCleanShutdown)) {
+      (producerStateManager.latestSnapshotOffset.asScala.isEmpty && 
reloadFromCleanShutdown)) {
       // To avoid an expensive scan through all of the segments, we take empty 
snapshots from the start of the

Review Comment:
   Can we call !isPresent instead of asScala.isEmpty?



##########
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala:
##########
@@ -557,7 +558,7 @@ class LogLoaderTest {
       _topicId = None,
       keepPartitionMetadataFile = true)
 
-    verify(stateManager).removeStraySnapshots(any[Seq[Long]])
+    
verify(stateManager).removeStraySnapshots((any[java.util.List[java.lang.Long]]))

Review Comment:
   Are the extra parenthesis around `any` needed? We have a few similar 
examples in this file.



##########
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala:
##########
@@ -340,8 +342,7 @@ class ProducerStateManagerTest {
     // After reloading from the snapshot, the transaction should still be 
considered late
     val reloadedStateManager = new ProducerStateManager(partition, logDir, 
maxTransactionTimeoutMs,
       producerStateManagerConfig, time)
-    reloadedStateManager.truncateAndReload(logStartOffset = 0L,
-      logEndOffset = stateManager.mapEndOffset, currentTimeMs = 
time.milliseconds())
+    reloadedStateManager.truncateAndReload(0L,stateManager.mapEndOffset, 
time.milliseconds())

Review Comment:
   Nit: space missing after `,`.



##########
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateManagerConfig.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class ProducerStateManagerConfig {
+    public static final Set<String> RECONFIGURABLE_CONFIGS = 
Collections.singleton("producer.id.expiration.ms");
+    private volatile int producerIdExpirationMs;
+
+    public ProducerStateManagerConfig(int producerIdExpirationMs) {
+        this.producerIdExpirationMs = producerIdExpirationMs;
+    }
+
+    public void updateProducerIdExpirationMs(int producerIdExpirationMs) {

Review Comment:
   Nit: use `set` instead of `update`?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -680,20 +680,20 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock 
synchronized {
-    producerStateManager.activeProducers.map { case (producerId, 
producerIdEntry) =>
-      (producerId, producerIdEntry.lastSeq)
+    producerStateManager.activeProducers.asScala.map { case (producerId, 
producerIdEntry) =>
+      (producerId.toLong, producerIdEntry.lastSeq)
     }
-  }
+  }.toMap

Review Comment:
   Can we avoid this `toMap` copy? Same for the case a few lines below.



##########
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##########
@@ -652,7 +652,7 @@ class TransactionsTest extends IntegrationTestHarness {
       producer.commitTransaction()
 
       var producerStateEntry =
-        brokers(partitionLeader).logManager.getLog(new 
TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers.head._2
+        brokers(partitionLeader).logManager.getLog(new 
TopicPartition(testTopic, 
0)).get.producerStateManager.activeProducers.asScala.head._2

Review Comment:
   Can we use `.get(0)` instead of `asScala.head`? There's one other similar 
example in this file.



##########
core/src/test/scala/unit/kafka/log/LogTestUtils.scala:
##########
@@ -247,7 +246,7 @@ object LogTestUtils {
   }
 
   def listProducerSnapshotOffsets(logDir: File): Seq[Long] =
-    ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted
+    
ProducerStateManager.listSnapshotFiles(logDir).asScala.map(_.offset).sorted.toSeq

Review Comment:
   Is the `toSeq` at the end required? I'd have thought `sorted` returns a 
`Seq`.



##########
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##########
@@ -685,7 +685,7 @@ class TransactionsTest extends IntegrationTestHarness {
       // get here without having bumped the epoch. If bumping the epoch is 
possible, the producer will attempt to, so
       // check there that the epoch has actually increased
       producerStateEntry =
-        brokers(partitionLeader).logManager.getLog(new 
TopicPartition(testTopic, 
0)).get.producerStateManager.activeProducers(producerId)
+        brokers(partitionLeader).logManager.getLog(new 
TopicPartition(testTopic, 
0)).get.producerStateManager.activeProducers.get(producerId)

Review Comment:
   One difference in behavior is that Java's `Map.get` returns `null` if no 
mapping is found while Scala's `Map.apply` throws an exception. There's another 
similar example in this file.



##########
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateManagerConfig.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class ProducerStateManagerConfig {
+    public static final Set<String> RECONFIGURABLE_CONFIGS = 
Collections.singleton("producer.id.expiration.ms");

Review Comment:
   We should have a static final for this config name here and reference it 
from `KafkaConfig`.



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -215,11 +212,11 @@ class UnifiedLogTest {
     log.close()
 
     val reopened = createLog(logDir, logConfig)
-    assertEquals(Some(new LogOffsetMetadata(3L)), 
reopened.producerStateManager.firstUnstableOffset)
+    assertEquals(Optional.of(new LogOffsetMetadata(3L)), 
reopened.producerStateManager.firstUnstableOffset)
 
     truncateFunc(reopened)(0L)
     assertEquals(None, reopened.firstUnstableOffset)
-    assertEquals(Map.empty, reopened.producerStateManager.activeProducers)
+    assertTrue(reopened.producerStateManager.activeProducers.isEmpty)

Review Comment:
   The previous approach is better as it includes the map contents in the 
message.



##########
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala:
##########
@@ -88,7 +88,7 @@ class ProducerStateManagerTest {
     val producerEpoch = 2.toShort
     appendEndTxnMarker(stateManager, producerId, producerEpoch, 
ControlRecordType.COMMIT, offset = 27L)
 
-    val firstEntry = stateManager.lastEntry(producerId).getOrElse(throw new 
RuntimeException("Expected last entry to be defined"))
+    val firstEntry = stateManager.lastEntry(producerId).orElseThrow(() => 
throw new RuntimeException("Expected last entry to be defined"))

Review Comment:
   You are supposed to return the exception, not throw it (when using 
`orElseThrow`). Please update similar places in this file.



##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -1031,3 +1031,20 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
     listeners.map(e => (e.listenerName, e)).toMap
 
 }
+
+class DynamicProducerStateManagerConfig(val producerStateManagerConfig: 
ProducerStateManagerConfig) extends BrokerReconfigurable with Logging {
+  def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
+    if (producerStateManagerConfig.producerIdExpirationMs() != 
newConfig.producerIdExpirationMs) {
+      info(s"Reconfigure ${KafkaConfig.ProducerIdExpirationMsProp} from 
${producerStateManagerConfig.producerIdExpirationMs()} to 
${newConfig.producerIdExpirationMs}")
+      
producerStateManagerConfig.updateProducerIdExpirationMs(newConfig.producerIdExpirationMs)
+    }
+  }
+
+  def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+    if (newConfig.producerIdExpirationMs < 0)
+      throw new ConfigException(s"${KafkaConfig.ProducerIdExpirationMsProp} 
cannot be less than 0, current value is ${newConfig.producerIdExpirationMs}")

Review Comment:
   In the old code, the second parameter is 
`producerStateManagerConfig.producerIdExpirationMs`. I think we should actually 
have both the current and new value.



##########
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala:
##########
@@ -192,7 +192,8 @@ class ProducerStateManagerTest {
 
     // should be able to append with the new epoch if we start at sequence 0
     append(stateManager, producerId, bumpedProducerEpoch, 0, 2L)
-    assertEquals(Some(0), stateManager.lastEntry(producerId).map(_.firstSeq))
+    val value: Optional[Int] = 
stateManager.lastEntry(producerId).map(_.firstSeq)

Review Comment:
   Do we need to extract the val here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to