clolov commented on code in PR #14652: URL: https://github.com/apache/kafka/pull/14652#discussion_r1377358311
########## core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala: ########## @@ -121,4 +184,106 @@ class ReplicaFetcherTierStateMachineTest { assertTrue(fetcher.fetchState(partition).isEmpty) assertTrue(failedPartitions.contains(partition)) } + + private def mockBuildRemoteLogAuxState(mockReplicaMgr: ReplicaManager, topicPartition: TopicPartition): Unit = { + val idPartition = new TopicIdPartition(Uuid.randomUuid(), topicPartition) + logDir = JTestUtils.tempDirectory(s"kafka-${this.getClass.getSimpleName}") + val tpDir = JTestUtils.tempDirectory(logDir.toPath, idPartition.toString) + stateManager = new ProducerStateManager(topicPartition, tpDir, 5 * 60 * 1000, producerStateManagerConfig, time) + val unifiedLog = buildUnifiedLog(topicPartition, stateManager, tpDir) + + doReturn(unifiedLog, Nil: _*).when(mockReplicaMgr).localLogOrException(any(classOf[TopicPartition])) Review Comment: For my understanding, what is the purpose of `Nil: _*`? Is it a roundabout way to verify that the method is called only once because on the second call you would get a NullPointerException? If so, can you have an explicit verification of the number of invocations? ########## core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala: ########## @@ -121,4 +184,106 @@ class ReplicaFetcherTierStateMachineTest { assertTrue(fetcher.fetchState(partition).isEmpty) assertTrue(failedPartitions.contains(partition)) } + + private def mockBuildRemoteLogAuxState(mockReplicaMgr: ReplicaManager, topicPartition: TopicPartition): Unit = { + val idPartition = new TopicIdPartition(Uuid.randomUuid(), topicPartition) + logDir = JTestUtils.tempDirectory(s"kafka-${this.getClass.getSimpleName}") + val tpDir = JTestUtils.tempDirectory(logDir.toPath, idPartition.toString) + stateManager = new ProducerStateManager(topicPartition, tpDir, 5 * 60 * 1000, producerStateManagerConfig, time) + val unifiedLog = buildUnifiedLog(topicPartition, stateManager, tpDir) + + doReturn(unifiedLog, Nil: _*).when(mockReplicaMgr).localLogOrException(any(classOf[TopicPartition])) + val mockRemoteLogManager = mock(classOf[RemoteLogManager]) + doReturn(Option.apply(mockRemoteLogManager), Nil: _*).when(mockReplicaMgr).remoteLogManager + + val remoteLogSegmentMetadata = new RemoteLogSegmentMetadata( + RemoteLogSegmentId.generateNew(new TopicIdPartition(Uuid.randomUuid(), topicPartition)), + 4L, 4L, -1L, brokerId, -1L, 1024, + Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, Collections.singletonMap(4, 4L)) + doReturn(Optional.of(remoteLogSegmentMetadata), Nil: _*).when(mockRemoteLogManager).fetchRemoteLogSegmentMetadata(any(classOf[TopicPartition]), ArgumentMatchers.eq(4), ArgumentMatchers.eq(4L)) Review Comment: Do you not know the topicPartition which will be used here? Is there a reason to use matchers? ########## core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala: ########## @@ -121,4 +184,106 @@ class ReplicaFetcherTierStateMachineTest { assertTrue(fetcher.fetchState(partition).isEmpty) assertTrue(failedPartitions.contains(partition)) } + + private def mockBuildRemoteLogAuxState(mockReplicaMgr: ReplicaManager, topicPartition: TopicPartition): Unit = { + val idPartition = new TopicIdPartition(Uuid.randomUuid(), topicPartition) + logDir = JTestUtils.tempDirectory(s"kafka-${this.getClass.getSimpleName}") + val tpDir = JTestUtils.tempDirectory(logDir.toPath, idPartition.toString) + stateManager = new ProducerStateManager(topicPartition, tpDir, 5 * 60 * 1000, producerStateManagerConfig, time) + val unifiedLog = buildUnifiedLog(topicPartition, stateManager, tpDir) + + doReturn(unifiedLog, Nil: _*).when(mockReplicaMgr).localLogOrException(any(classOf[TopicPartition])) + val mockRemoteLogManager = mock(classOf[RemoteLogManager]) + doReturn(Option.apply(mockRemoteLogManager), Nil: _*).when(mockReplicaMgr).remoteLogManager + + val remoteLogSegmentMetadata = new RemoteLogSegmentMetadata( + RemoteLogSegmentId.generateNew(new TopicIdPartition(Uuid.randomUuid(), topicPartition)), Review Comment: Doesn't this topicId have to be the same as the topicId in line 189? ########## core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala: ########## @@ -121,4 +184,106 @@ class ReplicaFetcherTierStateMachineTest { assertTrue(fetcher.fetchState(partition).isEmpty) assertTrue(failedPartitions.contains(partition)) } + + private def mockBuildRemoteLogAuxState(mockReplicaMgr: ReplicaManager, topicPartition: TopicPartition): Unit = { + val idPartition = new TopicIdPartition(Uuid.randomUuid(), topicPartition) + logDir = JTestUtils.tempDirectory(s"kafka-${this.getClass.getSimpleName}") + val tpDir = JTestUtils.tempDirectory(logDir.toPath, idPartition.toString) + stateManager = new ProducerStateManager(topicPartition, tpDir, 5 * 60 * 1000, producerStateManagerConfig, time) + val unifiedLog = buildUnifiedLog(topicPartition, stateManager, tpDir) + + doReturn(unifiedLog, Nil: _*).when(mockReplicaMgr).localLogOrException(any(classOf[TopicPartition])) + val mockRemoteLogManager = mock(classOf[RemoteLogManager]) + doReturn(Option.apply(mockRemoteLogManager), Nil: _*).when(mockReplicaMgr).remoteLogManager + + val remoteLogSegmentMetadata = new RemoteLogSegmentMetadata( + RemoteLogSegmentId.generateNew(new TopicIdPartition(Uuid.randomUuid(), topicPartition)), + 4L, 4L, -1L, brokerId, -1L, 1024, + Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, Collections.singletonMap(4, 4L)) + doReturn(Optional.of(remoteLogSegmentMetadata), Nil: _*).when(mockRemoteLogManager).fetchRemoteLogSegmentMetadata(any(classOf[TopicPartition]), ArgumentMatchers.eq(4), ArgumentMatchers.eq(4L)) + + val mockPartition = mock(classOf[Partition]) + doReturn(mockPartition, Nil: _*).when(mockReplicaMgr).getPartitionOrException(any(classOf[TopicPartition])) + when(mockPartition.truncateFullyAndStartAt(anyLong(), anyBoolean(), any(classOf[Option[Long]]))) + .thenAnswer(ans => { + val newOffset = ans.getArgument[Long](0) + val logStartOffsetOpt = ans.getArgument[Option[Long]](2) + unifiedLog.truncateFullyAndStartAt(newOffset, logStartOffsetOpt) + }) + + val mockRemoteStorageManager = mock(classOf[RemoteStorageManager]) + doReturn(mockRemoteStorageManager, Nil: _*).when(mockRemoteLogManager).storageManager() + + tpDirForRemoteSnapshotFile = JTestUtils.tempDirectory(JTestUtils.tempDirectory(s"remote-kafka-${this.getClass.getSimpleName}").toPath, idPartition.toString) + val stateManagerForRemoteSnapshotFile = new ProducerStateManager(topicPartition, tpDirForRemoteSnapshotFile, 5 * 60 * 1000, producerStateManagerConfig, time) + val remoteSnapshotFile = prepareRemoteSnapshotFile(stateManagerForRemoteSnapshotFile) + + when(mockRemoteStorageManager.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType]))) + .thenAnswer(ans => { + val indexType = ans.getArgument[IndexType](1) + indexType match { + case IndexType.LEADER_EPOCH => new FileInputStream(JTestUtils.tempFile()) + case IndexType.PRODUCER_SNAPSHOT => new FileInputStream(remoteSnapshotFile) + case IndexType.OFFSET => // not access here + case IndexType.TIMESTAMP => // not access here + case IndexType.TRANSACTION => // not access here + } + }) + } + + private def buildUnifiedLog(topicPartition: TopicPartition, producerStateManager: ProducerStateManager, tpDir: File): UnifiedLog = { + val topicConfig = new Properties() + topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + val logConfig = new LogConfig(topicConfig) + val mockScheduler = mock(classOf[Scheduler]) + val producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs + val logDirFailureChannel = new LogDirFailureChannel(10) Review Comment: Why 10? Isn't 1 log directory enough? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org