This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 94f5a4f63e7 KAFKA-17135 Add unit test for 
`ProducerStateManager#readSnapshot` and `ProducerStateManager#writeSnapshot` 
(#16603)
94f5a4f63e7 is described below

commit 94f5a4f63e7a385325fc9a0f7d592a417d25711f
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Thu Jul 18 18:23:29 2024 +0800

    KAFKA-17135 Add unit test for `ProducerStateManager#readSnapshot` and 
`ProducerStateManager#writeSnapshot` (#16603)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../unit/kafka/log/ProducerStateManagerTest.scala  | 36 +++++++++++++++++++++-
 .../internals/log/ProducerStateManager.java        |  3 +-
 2 files changed, 37 insertions(+), 2 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 10e33588486..d99e2f2aa70 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{MockTime, Utils}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
-import org.apache.kafka.storage.internals.log.{AppendOrigin, CompletedTxn, 
LogFileUtils, LogOffsetMetadata, ProducerAppendInfo, ProducerStateEntry, 
ProducerStateManager, ProducerStateManagerConfig, TxnMetadata, 
VerificationStateEntry}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, BatchMetadata, 
CompletedTxn, LogFileUtils, LogOffsetMetadata, ProducerAppendInfo, 
ProducerStateEntry, ProducerStateManager, ProducerStateManagerConfig, 
TxnMetadata, VerificationStateEntry}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
@@ -1175,6 +1175,40 @@ class ProducerStateManagerTest {
     assertNull(stateManager.verificationStateEntry(producerId))
   }
 
+  @Test
+  def testReadWriteSnapshot(): Unit = {
+    val expectedEntryMap = new java.util.HashMap[java.lang.Long, 
ProducerStateEntry]
+    expectedEntryMap.put(1, new ProducerStateEntry(1L, 2, 3, 
RecordBatch.NO_TIMESTAMP,
+      OptionalLong.of(100L), java.util.Optional.of(new BatchMetadata(1, 2L, 3, 
RecordBatch.NO_TIMESTAMP))))
+    expectedEntryMap.put(11, new ProducerStateEntry(11L, 12, 13, 123456L,
+      OptionalLong.empty(), java.util.Optional.empty()))
+
+    def assertEntries(actual: util.List[ProducerStateEntry]): Unit = {
+      val actualEntryMap = actual.asScala.map(p => (p.producerId(), 
p)).toMap.asJava
+      assertEquals(expectedEntryMap.keySet(), actualEntryMap.keySet())
+      expectedEntryMap.forEach {
+        case (producerId, entry) =>
+          assertProducerStateEntry(entry, actualEntryMap.get(producerId))
+      }
+    }
+
+    def assertProducerStateEntry(expected: ProducerStateEntry, actual: 
ProducerStateEntry): Unit = {
+      assertEquals(expected.producerId(), actual.producerId())
+      assertEquals(expected.producerEpoch(), actual.producerEpoch())
+      assertEquals(expected.coordinatorEpoch(), actual.coordinatorEpoch())
+      assertEquals(expected.lastTimestamp(), actual.lastTimestamp())
+      assertEquals(expected.currentTxnFirstOffset(), 
actual.currentTxnFirstOffset())
+      assertIterableEquals(expected.batchMetadata(), actual.batchMetadata())
+    }
+
+    val tmpDir: File = TestUtils.tempDir()
+    try {
+      val file = new File(tmpDir, "testReadWriteSnapshot")
+      ProducerStateManager.writeSnapshot(file, expectedEntryMap, true)
+      assertEntries(ProducerStateManager.readSnapshot(file))
+    } finally Utils.delete(tmpDir)
+  }
+
   private def testLoadFromCorruptSnapshot(makeFileCorrupt: FileChannel => 
Unit): Unit = {
     val epoch = 0.toShort
     val producerId = 1L
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
index 6a3d3ed759d..883f2059a4b 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
@@ -685,7 +685,8 @@ public class ProducerStateManager {
         }
     }
 
-    private static void writeSnapshot(File file, Map<Long, ProducerStateEntry> 
entries, boolean sync) throws IOException {
+    // visible for testing
+    public static void writeSnapshot(File file, Map<Long, ProducerStateEntry> 
entries, boolean sync) throws IOException {
         Struct struct = new Struct(PID_SNAPSHOT_MAP_SCHEMA);
         struct.set(VERSION_FIELD, PRODUCER_SNAPSHOT_VERSION);
         struct.set(CRC_FIELD, 0L); // we'll fill this after writing the entries

Reply via email to