This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 94f5a4f63e7 KAFKA-17135 Add unit test for
`ProducerStateManager#readSnapshot` and `ProducerStateManager#writeSnapshot`
(#16603)
94f5a4f63e7 is described below
commit 94f5a4f63e7a385325fc9a0f7d592a417d25711f
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Thu Jul 18 18:23:29 2024 +0800
KAFKA-17135 Add unit test for `ProducerStateManager#readSnapshot` and
`ProducerStateManager#writeSnapshot` (#16603)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../unit/kafka/log/ProducerStateManagerTest.scala | 36 +++++++++++++++++++++-
.../internals/log/ProducerStateManager.java | 3 +-
2 files changed, 37 insertions(+), 2 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 10e33588486..d99e2f2aa70 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{MockTime, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
-import org.apache.kafka.storage.internals.log.{AppendOrigin, CompletedTxn,
LogFileUtils, LogOffsetMetadata, ProducerAppendInfo, ProducerStateEntry,
ProducerStateManager, ProducerStateManagerConfig, TxnMetadata,
VerificationStateEntry}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, BatchMetadata,
CompletedTxn, LogFileUtils, LogOffsetMetadata, ProducerAppendInfo,
ProducerStateEntry, ProducerStateManager, ProducerStateManagerConfig,
TxnMetadata, VerificationStateEntry}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@@ -1175,6 +1175,40 @@ class ProducerStateManagerTest {
assertNull(stateManager.verificationStateEntry(producerId))
}
+ @Test
+ def testReadWriteSnapshot(): Unit = {
+ val expectedEntryMap = new java.util.HashMap[java.lang.Long,
ProducerStateEntry]
+ expectedEntryMap.put(1, new ProducerStateEntry(1L, 2, 3,
RecordBatch.NO_TIMESTAMP,
+ OptionalLong.of(100L), java.util.Optional.of(new BatchMetadata(1, 2L, 3,
RecordBatch.NO_TIMESTAMP))))
+ expectedEntryMap.put(11, new ProducerStateEntry(11L, 12, 13, 123456L,
+ OptionalLong.empty(), java.util.Optional.empty()))
+
+ def assertEntries(actual: util.List[ProducerStateEntry]): Unit = {
+ val actualEntryMap = actual.asScala.map(p => (p.producerId(),
p)).toMap.asJava
+ assertEquals(expectedEntryMap.keySet(), actualEntryMap.keySet())
+ expectedEntryMap.forEach {
+ case (producerId, entry) =>
+ assertProducerStateEntry(entry, actualEntryMap.get(producerId))
+ }
+ }
+
+ def assertProducerStateEntry(expected: ProducerStateEntry, actual:
ProducerStateEntry): Unit = {
+ assertEquals(expected.producerId(), actual.producerId())
+ assertEquals(expected.producerEpoch(), actual.producerEpoch())
+ assertEquals(expected.coordinatorEpoch(), actual.coordinatorEpoch())
+ assertEquals(expected.lastTimestamp(), actual.lastTimestamp())
+ assertEquals(expected.currentTxnFirstOffset(),
actual.currentTxnFirstOffset())
+ assertIterableEquals(expected.batchMetadata(), actual.batchMetadata())
+ }
+
+ val tmpDir: File = TestUtils.tempDir()
+ try {
+ val file = new File(tmpDir, "testReadWriteSnapshot")
+ ProducerStateManager.writeSnapshot(file, expectedEntryMap, true)
+ assertEntries(ProducerStateManager.readSnapshot(file))
+ } finally Utils.delete(tmpDir)
+ }
+
private def testLoadFromCorruptSnapshot(makeFileCorrupt: FileChannel =>
Unit): Unit = {
val epoch = 0.toShort
val producerId = 1L
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
index 6a3d3ed759d..883f2059a4b 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
@@ -685,7 +685,8 @@ public class ProducerStateManager {
}
}
- private static void writeSnapshot(File file, Map<Long, ProducerStateEntry>
entries, boolean sync) throws IOException {
+ // visible for testing
+ public static void writeSnapshot(File file, Map<Long, ProducerStateEntry>
entries, boolean sync) throws IOException {
Struct struct = new Struct(PID_SNAPSHOT_MAP_SCHEMA);
struct.set(VERSION_FIELD, PRODUCER_SNAPSHOT_VERSION);
struct.set(CRC_FIELD, 0L); // we'll fill this after writing the entries