clolov commented on code in PR #14652:
URL: https://github.com/apache/kafka/pull/14652#discussion_r1377358311


##########
core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala:
##########
@@ -121,4 +184,106 @@ class ReplicaFetcherTierStateMachineTest {
     assertTrue(fetcher.fetchState(partition).isEmpty)
     assertTrue(failedPartitions.contains(partition))
   }
+
+  private def mockBuildRemoteLogAuxState(mockReplicaMgr: ReplicaManager, 
topicPartition: TopicPartition): Unit = {
+    val idPartition = new TopicIdPartition(Uuid.randomUuid(), topicPartition)
+    logDir = JTestUtils.tempDirectory(s"kafka-${this.getClass.getSimpleName}")
+    val tpDir = JTestUtils.tempDirectory(logDir.toPath, idPartition.toString)
+    stateManager = new ProducerStateManager(topicPartition, tpDir, 5 * 60 * 
1000, producerStateManagerConfig, time)
+    val unifiedLog = buildUnifiedLog(topicPartition, stateManager, tpDir)
+
+    doReturn(unifiedLog, Nil: 
_*).when(mockReplicaMgr).localLogOrException(any(classOf[TopicPartition]))

Review Comment:
   For my understanding, what is the purpose of `Nil: _*`? Is it a roundabout 
way to verify that the method is called only once because on the second call 
you would get a NullPointerException? If so, can you have an explicit 
verification of the number of invocations?



##########
core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala:
##########
@@ -121,4 +184,106 @@ class ReplicaFetcherTierStateMachineTest {
     assertTrue(fetcher.fetchState(partition).isEmpty)
     assertTrue(failedPartitions.contains(partition))
   }
+
+  private def mockBuildRemoteLogAuxState(mockReplicaMgr: ReplicaManager, 
topicPartition: TopicPartition): Unit = {
+    val idPartition = new TopicIdPartition(Uuid.randomUuid(), topicPartition)
+    logDir = JTestUtils.tempDirectory(s"kafka-${this.getClass.getSimpleName}")
+    val tpDir = JTestUtils.tempDirectory(logDir.toPath, idPartition.toString)
+    stateManager = new ProducerStateManager(topicPartition, tpDir, 5 * 60 * 
1000, producerStateManagerConfig, time)
+    val unifiedLog = buildUnifiedLog(topicPartition, stateManager, tpDir)
+
+    doReturn(unifiedLog, Nil: 
_*).when(mockReplicaMgr).localLogOrException(any(classOf[TopicPartition]))
+    val mockRemoteLogManager = mock(classOf[RemoteLogManager])
+    doReturn(Option.apply(mockRemoteLogManager), Nil: 
_*).when(mockReplicaMgr).remoteLogManager
+
+    val remoteLogSegmentMetadata = new RemoteLogSegmentMetadata(
+      RemoteLogSegmentId.generateNew(new TopicIdPartition(Uuid.randomUuid(), 
topicPartition)),
+      4L, 4L, -1L, brokerId, -1L, 1024,
+      Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 
Collections.singletonMap(4, 4L))
+    doReturn(Optional.of(remoteLogSegmentMetadata), Nil: 
_*).when(mockRemoteLogManager).fetchRemoteLogSegmentMetadata(any(classOf[TopicPartition]),
 ArgumentMatchers.eq(4), ArgumentMatchers.eq(4L))

Review Comment:
   Do you not know the topicPartition which will be used here? Is there a 
reason to use matchers?



##########
core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala:
##########
@@ -121,4 +184,106 @@ class ReplicaFetcherTierStateMachineTest {
     assertTrue(fetcher.fetchState(partition).isEmpty)
     assertTrue(failedPartitions.contains(partition))
   }
+
+  private def mockBuildRemoteLogAuxState(mockReplicaMgr: ReplicaManager, 
topicPartition: TopicPartition): Unit = {
+    val idPartition = new TopicIdPartition(Uuid.randomUuid(), topicPartition)
+    logDir = JTestUtils.tempDirectory(s"kafka-${this.getClass.getSimpleName}")
+    val tpDir = JTestUtils.tempDirectory(logDir.toPath, idPartition.toString)
+    stateManager = new ProducerStateManager(topicPartition, tpDir, 5 * 60 * 
1000, producerStateManagerConfig, time)
+    val unifiedLog = buildUnifiedLog(topicPartition, stateManager, tpDir)
+
+    doReturn(unifiedLog, Nil: 
_*).when(mockReplicaMgr).localLogOrException(any(classOf[TopicPartition]))
+    val mockRemoteLogManager = mock(classOf[RemoteLogManager])
+    doReturn(Option.apply(mockRemoteLogManager), Nil: 
_*).when(mockReplicaMgr).remoteLogManager
+
+    val remoteLogSegmentMetadata = new RemoteLogSegmentMetadata(
+      RemoteLogSegmentId.generateNew(new TopicIdPartition(Uuid.randomUuid(), 
topicPartition)),

Review Comment:
   Doesn't this topicId have to be the same as the topicId in line 189?



##########
core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala:
##########
@@ -121,4 +184,106 @@ class ReplicaFetcherTierStateMachineTest {
     assertTrue(fetcher.fetchState(partition).isEmpty)
     assertTrue(failedPartitions.contains(partition))
   }
+
+  private def mockBuildRemoteLogAuxState(mockReplicaMgr: ReplicaManager, 
topicPartition: TopicPartition): Unit = {
+    val idPartition = new TopicIdPartition(Uuid.randomUuid(), topicPartition)
+    logDir = JTestUtils.tempDirectory(s"kafka-${this.getClass.getSimpleName}")
+    val tpDir = JTestUtils.tempDirectory(logDir.toPath, idPartition.toString)
+    stateManager = new ProducerStateManager(topicPartition, tpDir, 5 * 60 * 
1000, producerStateManagerConfig, time)
+    val unifiedLog = buildUnifiedLog(topicPartition, stateManager, tpDir)
+
+    doReturn(unifiedLog, Nil: 
_*).when(mockReplicaMgr).localLogOrException(any(classOf[TopicPartition]))
+    val mockRemoteLogManager = mock(classOf[RemoteLogManager])
+    doReturn(Option.apply(mockRemoteLogManager), Nil: 
_*).when(mockReplicaMgr).remoteLogManager
+
+    val remoteLogSegmentMetadata = new RemoteLogSegmentMetadata(
+      RemoteLogSegmentId.generateNew(new TopicIdPartition(Uuid.randomUuid(), 
topicPartition)),
+      4L, 4L, -1L, brokerId, -1L, 1024,
+      Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 
Collections.singletonMap(4, 4L))
+    doReturn(Optional.of(remoteLogSegmentMetadata), Nil: 
_*).when(mockRemoteLogManager).fetchRemoteLogSegmentMetadata(any(classOf[TopicPartition]),
 ArgumentMatchers.eq(4), ArgumentMatchers.eq(4L))
+
+    val mockPartition = mock(classOf[Partition])
+    doReturn(mockPartition, Nil: 
_*).when(mockReplicaMgr).getPartitionOrException(any(classOf[TopicPartition]))
+    when(mockPartition.truncateFullyAndStartAt(anyLong(), anyBoolean(), 
any(classOf[Option[Long]])))
+      .thenAnswer(ans => {
+        val newOffset = ans.getArgument[Long](0)
+        val logStartOffsetOpt = ans.getArgument[Option[Long]](2)
+        unifiedLog.truncateFullyAndStartAt(newOffset, logStartOffsetOpt)
+      })
+
+    val mockRemoteStorageManager = mock(classOf[RemoteStorageManager])
+    doReturn(mockRemoteStorageManager, Nil: 
_*).when(mockRemoteLogManager).storageManager()
+
+    tpDirForRemoteSnapshotFile = 
JTestUtils.tempDirectory(JTestUtils.tempDirectory(s"remote-kafka-${this.getClass.getSimpleName}").toPath,
 idPartition.toString)
+    val stateManagerForRemoteSnapshotFile = new 
ProducerStateManager(topicPartition, tpDirForRemoteSnapshotFile, 5 * 60 * 1000, 
producerStateManagerConfig, time)
+    val remoteSnapshotFile = 
prepareRemoteSnapshotFile(stateManagerForRemoteSnapshotFile)
+
+    
when(mockRemoteStorageManager.fetchIndex(any(classOf[RemoteLogSegmentMetadata]),
 any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val indexType = ans.getArgument[IndexType](1)
+        indexType match {
+          case IndexType.LEADER_EPOCH => new 
FileInputStream(JTestUtils.tempFile())
+          case IndexType.PRODUCER_SNAPSHOT => new 
FileInputStream(remoteSnapshotFile)
+          case IndexType.OFFSET => // not access here
+          case IndexType.TIMESTAMP => // not access here
+          case IndexType.TRANSACTION => // not access here
+        }
+      })
+  }
+
+  private def buildUnifiedLog(topicPartition: TopicPartition, 
producerStateManager: ProducerStateManager, tpDir: File): UnifiedLog = {
+    val topicConfig = new Properties()
+    topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+    val logConfig = new LogConfig(topicConfig)
+    val mockScheduler = mock(classOf[Scheduler])
+    val producerIdExpirationCheckIntervalMs = 
kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs
+    val logDirFailureChannel = new LogDirFailureChannel(10)

Review Comment:
   Why 10? Isn't 1 log directory enough?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to