clolov commented on code in PR #13999: URL: https://github.com/apache/kafka/pull/13999#discussion_r1267868782
########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -366,7 +370,82 @@ void testCopyLogSegmentsToRemoteShouldCopyExpectedLogSegment() throws Exception assertEquals(1, brokerTopicStats.allTopicsStats().remoteWriteRequestRate().count()); assertEquals(10, brokerTopicStats.allTopicsStats().remoteBytesOutRate().count()); assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().count()); + } + + @Test + void testRemoteLogManagerTasksAvgIdlePercentMetrics() throws Exception { + long oldSegmentStartOffset = 0L; + long nextSegmentStartOffset = 150L; + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + File tempFile = TestUtils.tempFile(); + File mockProducerSnapshotIndex = TestUtils.tempFile(); + File tempDir = TestUtils.tempDirectory(); + // create 2 log segments, with 0 and 150 as log start offset + LogSegment oldSegment = mock(LogSegment.class); + LogSegment activeSegment = mock(LogSegment.class); + + when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); + when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + + FileRecords fileRecords = mock(FileRecords.class); + when(oldSegment.log()).thenReturn(fileRecords); + when(fileRecords.file()).thenReturn(tempFile); + when(fileRecords.sizeInBytes()).thenReturn(10); + when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + when(mockLog.activeSegment()).thenReturn(activeSegment); + when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + + ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockLog.producerStateManager()).thenReturn(mockStateManager); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + when(mockLog.lastStableOffset()).thenReturn(250L); + + LazyIndex idx = LazyIndex.forOffset(UnifiedLog.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000); + LazyIndex timeIdx = LazyIndex.forTime(UnifiedLog.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500); + File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); + txnFile.createNewFile(); + TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); + when(oldSegment.lazyTimeIndex()).thenReturn(timeIdx); + when(oldSegment.lazyOffsetIndex()).thenReturn(idx); + when(oldSegment.txnIndex()).thenReturn(txnIndex); + + CompletableFuture<Void> dummyFuture = new CompletableFuture<>(); + dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + + CountDownLatch latch = new CountDownLatch(1); + doAnswer(ans -> { + // waiting for verification + latch.await(); + return null; + }).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); + Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition); + Partition mockFollowerPartition = mockPartition(followerTopicIdPartition); + + // before running tasks, the remote log manager tasks should be all idle + assertEquals(1.0, yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent")); + remoteLogManager.onLeadershipChange(Collections.singleton(mockLeaderPartition), Collections.singleton(mockFollowerPartition), topicIds); + assertTrue(yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent") < 1.0); + // unlock copyLogSegmentData + latch.countDown(); + } + private double yammerMetricValue(String name) { Review Comment: ```suggestion private double yammerMetricValue(String name) { ``` ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -3379,6 +3396,246 @@ class ReplicaManagerTest { testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, true, false, Errors.NONE) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(booleans = Array(true, false)) + def testOffsetOutOfRangeExceptionWhenReadFromLog(isFromFollower: Boolean): Unit = { + val replicaId = if (isFromFollower) 1 else -1 + val tp0 = new TopicPartition(topic, 0) + val tidp0 = new TopicIdPartition(topicId, tp0) + // create a replicaManager with remoteLog enabled + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true) + try { + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + val partition0Replicas = Seq[Integer](0, 1).asJava + val topicIds = Map(tp0.topic -> topicId).asJava + val leaderEpoch = 0 + val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq( + new LeaderAndIsrPartitionState() + .setTopicName(tp0.topic) + .setPartitionIndex(tp0.partition) + .setControllerEpoch(0) + .setLeader(leaderEpoch) + .setLeaderEpoch(0) + .setIsr(partition0Replicas) + .setPartitionEpoch(0) + .setReplicas(partition0Replicas) + .setIsNew(true) + ).asJava, + topicIds, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) + + val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 1000, 0, 100, FetchIsolation.LOG_END, None.asJava) + // when reading log, it'll throw OffsetOutOfRangeException, which will be handled separately + val result = replicaManager.readFromLog(params, Seq(tidp0 -> new PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), UnboundedQuota, false) + + if (isFromFollower) { + // expect OFFSET_MOVED_TO_TIERED_STORAGE error returned if it's from follower, since the data is already available in remote log + assertEquals(Errors.OFFSET_MOVED_TO_TIERED_STORAGE, result.head._2.error) + } else { + assertEquals(Errors.NONE, result.head._2.error) + } + assertEquals(startOffset, result.head._2.leaderLogStartOffset) + assertEquals(endOffset, result.head._2.leaderLogEndOffset) + assertEquals(highHW, result.head._2.highWatermark) + if (isFromFollower) { + assertFalse(result.head._2.info.delayedRemoteStorageFetch.isPresent) + } else { + // for consumer fetch, we should return a delayedRemoteStorageFetch to wait for remote fetch + assertTrue(result.head._2.info.delayedRemoteStorageFetch.isPresent) + } + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(booleans = Array(true, false)) + def testOffsetOutOfRangeExceptionWhenFetchMessages(isFromFollower: Boolean): Unit = { + val replicaId = if (isFromFollower) 1 else -1 + val tp0 = new TopicPartition(topic, 0) + val tidp0 = new TopicIdPartition(topicId, tp0) + // create a replicaManager with remoteLog enabled + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog= true) + try { + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + val partition0Replicas = Seq[Integer](0, 1).asJava + val topicIds = Map(tp0.topic -> topicId).asJava + val leaderEpoch = 0 + val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq( + new LeaderAndIsrPartitionState() + .setTopicName(tp0.topic) + .setPartitionIndex(tp0.partition) + .setControllerEpoch(0) + .setLeader(leaderEpoch) + .setLeaderEpoch(0) + .setIsr(partition0Replicas) + .setPartitionEpoch(0) + .setReplicas(partition0Replicas) + .setIsNew(true) + ).asJava, + topicIds, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) + + val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 1000, 10, 100, FetchIsolation.LOG_END, None.asJava) + val fetchOffset = 1 + + def fetchCallback(responseStatus: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + assertEquals(1, responseStatus.size) + assertEquals(tidp0, responseStatus.toMap.keySet.head) + val fetchPartitionData: FetchPartitionData = responseStatus.toMap.get(tidp0).get + // should only follower fetch enter callback since consumer fetch will enter remoteFetch purgatory + assertTrue(isFromFollower) + assertEquals(Errors.OFFSET_MOVED_TO_TIERED_STORAGE, fetchPartitionData.error) + assertEquals(startOffset, fetchPartitionData.logStartOffset) + assertEquals(highHW, fetchPartitionData.highWatermark) + } + + // when reading log, it'll throw OffsetOutOfRangeException, which will be handled separately + replicaManager.fetchMessages(params, Seq(tidp0 -> new PartitionData(topicId, fetchOffset, 0, 100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), UnboundedQuota, fetchCallback) + + val remoteStorageFetchInfoArg: ArgumentCaptor[RemoteStorageFetchInfo] = ArgumentCaptor.forClass(classOf[RemoteStorageFetchInfo]) + if (isFromFollower) { + verify(mockRemoteLogManager, never()).asyncRead(remoteStorageFetchInfoArg.capture(), any()) + } else { + verify(mockRemoteLogManager).asyncRead(remoteStorageFetchInfoArg.capture(), any()) + val remoteStorageFetchInfo = remoteStorageFetchInfoArg.getValue + assertEquals(tp0, remoteStorageFetchInfo.topicPartition) + assertEquals(fetchOffset, remoteStorageFetchInfo.fetchInfo.fetchOffset) + assertEquals(topicId, remoteStorageFetchInfo.fetchInfo.topicId) + assertEquals(startOffset, remoteStorageFetchInfo.fetchInfo.logStartOffset) + assertEquals(leaderEpoch, remoteStorageFetchInfo.fetchInfo.currentLeaderEpoch.get()) + } + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testRemoteLogReaderMetrics(): Unit = { + val replicaId = -1 + val tp0 = new TopicPartition(topic, 0) + val tidp0 = new TopicIdPartition(topicId, tp0) + + val props = new Properties() + props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true) + props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName) + props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) + // set log reader threads number to 2 + props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, 2) + val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props) + val remoteLogManagerConfig = new RemoteLogManagerConfig(config) + val mockLog = mock(classOf[UnifiedLog]) + val brokerTopicStats = new BrokerTopicStats + val remoteLogManager = new RemoteLogManager( + remoteLogManagerConfig, + 0, + TestUtils.tempRelativeDir("data").getAbsolutePath, + "clusterId", + time, + _ => Optional.of(mockLog), + brokerTopicStats) + val spyRLM = spy(remoteLogManager) + + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(spyRLM)) + try { + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + val partition0Replicas = Seq[Integer](0, 1).asJava + val topicIds = Map(tp0.topic -> topicId).asJava + val leaderEpoch = 0 + val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq( + new LeaderAndIsrPartitionState() + .setTopicName(tp0.topic) + .setPartitionIndex(tp0.partition) + .setControllerEpoch(0) + .setLeader(leaderEpoch) + .setLeaderEpoch(0) + .setIsr(partition0Replicas) + .setPartitionEpoch(0) + .setReplicas(partition0Replicas) + .setIsNew(true) + ).asJava, + topicIds, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) + + val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 1000, 10, 100, FetchIsolation.LOG_END, None.asJava) + val fetchOffset = 1 + + def fetchCallback(responseStatus: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + assertEquals(1, responseStatus.size) + assertEquals(tidp0, responseStatus.toMap.keySet.head) + } + + assertEquals(1.0, yammerMetricValue("RemoteLogReaderAvgIdlePercent").asInstanceOf[Double]) + assertEquals(0, yammerMetricValue("RemoteLogReaderTaskQueueSize").asInstanceOf[Int]) + + // our thread number is 2 + val queueLatch = new CountDownLatch(2) + val doneLatch = new CountDownLatch(1) + + doAnswer(_ => { + queueLatch.countDown() + // wait until verification completed + doneLatch.await() + new FetchDataInfo(new LogOffsetMetadata(startOffset), mock(classOf[Records])) + }).when(spyRLM).read(any()) + + // create 5 asyncRead tasks, which should enqueue 3 task + for (i <- 1 to 5) + replicaManager.fetchMessages(params, Seq(tidp0 -> new PartitionData(topicId, fetchOffset, 0, 100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), UnboundedQuota, fetchCallback) + + // wait until at least 2 task submitted to use all the available threads + queueLatch.await() + // make sure tasks are all submitted + Thread.sleep(100) + // RemoteLogReader should not be all idle + assertTrue(yammerMetricValue("RemoteLogReaderAvgIdlePercent").asInstanceOf[Double] < 1.0) + // RemoteLogReader should queue some tasks + assertTrue(yammerMetricValue("RemoteLogReaderTaskQueueSize").asInstanceOf[Int] > 0) Review Comment: I assume the above cannot be equal to 0.0 because of some uncertainty tolerance of when the metric is recorded, but why can this not be equal to 3? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org