clolov commented on code in PR #13999:
URL: https://github.com/apache/kafka/pull/13999#discussion_r1267868782


##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -366,7 +370,82 @@ void 
testCopyLogSegmentsToRemoteShouldCopyExpectedLogSegment() throws Exception
         assertEquals(1, 
brokerTopicStats.allTopicsStats().remoteWriteRequestRate().count());
         assertEquals(10, 
brokerTopicStats.allTopicsStats().remoteBytesOutRate().count());
         assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().count());
+    }
+
+    @Test
+    void testRemoteLogManagerTasksAvgIdlePercentMetrics() throws Exception {
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        File tempFile = TestUtils.tempFile();
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        File tempDir = TestUtils.tempDirectory();
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(oldSegment.log()).thenReturn(fileRecords);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+        when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
 
+        when(mockLog.activeSegment()).thenReturn(activeSegment);
+        when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+        ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+        when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+        
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+        when(mockLog.lastStableOffset()).thenReturn(250L);
+
+        LazyIndex idx = 
LazyIndex.forOffset(UnifiedLog.offsetIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1000);
+        LazyIndex timeIdx = 
LazyIndex.forTime(UnifiedLog.timeIndexFile(tempDir, oldSegmentStartOffset, ""), 
oldSegmentStartOffset, 1500);
+        File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+        txnFile.createNewFile();
+        TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+        when(oldSegment.lazyTimeIndex()).thenReturn(timeIdx);
+        when(oldSegment.lazyOffsetIndex()).thenReturn(idx);
+        when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+        CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+        dummyFuture.complete(null);
+        
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+        
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(ans -> {
+            // waiting for verification
+            latch.await();
+            return null;
+        
}).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class));
+        Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition);
+        Partition mockFollowerPartition = 
mockPartition(followerTopicIdPartition);
+
+        // before running tasks, the remote log manager tasks should be all 
idle
+        assertEquals(1.0, 
yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent"));
+        
remoteLogManager.onLeadershipChange(Collections.singleton(mockLeaderPartition), 
Collections.singleton(mockFollowerPartition), topicIds);
+        assertTrue(yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent") < 
1.0);
+        // unlock copyLogSegmentData
+        latch.countDown();
+    }
+    private double yammerMetricValue(String name) {

Review Comment:
   ```suggestion
   
       private double yammerMetricValue(String name) {
   ```



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3379,6 +3396,246 @@ class ReplicaManagerTest {
     testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, true, false, 
Errors.NONE)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(booleans = Array(true, false))
+  def testOffsetOutOfRangeExceptionWhenReadFromLog(isFromFollower: Boolean): 
Unit = {
+    val replicaId = if (isFromFollower) 1 else -1
+    val tp0 = new TopicPartition(topic, 0)
+    val tidp0 = new TopicIdPartition(topicId, tp0)
+    // create a replicaManager with remoteLog enabled
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true)
+    try {
+      val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+      replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+      val partition0Replicas = Seq[Integer](0, 1).asJava
+      val topicIds = Map(tp0.topic -> topicId).asJava
+      val leaderEpoch = 0
+      val leaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(
+          new LeaderAndIsrPartitionState()
+            .setTopicName(tp0.topic)
+            .setPartitionIndex(tp0.partition)
+            .setControllerEpoch(0)
+            .setLeader(leaderEpoch)
+            .setLeaderEpoch(0)
+            .setIsr(partition0Replicas)
+            .setPartitionEpoch(0)
+            .setReplicas(partition0Replicas)
+            .setIsNew(true)
+        ).asJava,
+        topicIds,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
+
+      val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 
1000, 0, 100, FetchIsolation.LOG_END, None.asJava)
+      // when reading log, it'll throw OffsetOutOfRangeException, which will 
be handled separately
+      val result = replicaManager.readFromLog(params, Seq(tidp0 -> new 
PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch), 
Optional.of[Integer](leaderEpoch))), UnboundedQuota, false)
+
+      if (isFromFollower) {
+        // expect OFFSET_MOVED_TO_TIERED_STORAGE error returned if it's from 
follower, since the data is already available in remote log
+        assertEquals(Errors.OFFSET_MOVED_TO_TIERED_STORAGE, 
result.head._2.error)
+      } else {
+        assertEquals(Errors.NONE, result.head._2.error)
+      }
+      assertEquals(startOffset, result.head._2.leaderLogStartOffset)
+      assertEquals(endOffset, result.head._2.leaderLogEndOffset)
+      assertEquals(highHW, result.head._2.highWatermark)
+      if (isFromFollower) {
+        assertFalse(result.head._2.info.delayedRemoteStorageFetch.isPresent)
+      } else {
+        // for consumer fetch, we should return a delayedRemoteStorageFetch to 
wait for remote fetch
+        assertTrue(result.head._2.info.delayedRemoteStorageFetch.isPresent)
+      }
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(booleans = Array(true, false))
+  def testOffsetOutOfRangeExceptionWhenFetchMessages(isFromFollower: Boolean): 
Unit = {
+    val replicaId = if (isFromFollower) 1 else -1
+    val tp0 = new TopicPartition(topic, 0)
+    val tidp0 = new TopicIdPartition(topicId, tp0)
+    // create a replicaManager with remoteLog enabled
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog= true)
+    try {
+      val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+      replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+      val partition0Replicas = Seq[Integer](0, 1).asJava
+      val topicIds = Map(tp0.topic -> topicId).asJava
+      val leaderEpoch = 0
+      val leaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(
+          new LeaderAndIsrPartitionState()
+            .setTopicName(tp0.topic)
+            .setPartitionIndex(tp0.partition)
+            .setControllerEpoch(0)
+            .setLeader(leaderEpoch)
+            .setLeaderEpoch(0)
+            .setIsr(partition0Replicas)
+            .setPartitionEpoch(0)
+            .setReplicas(partition0Replicas)
+            .setIsNew(true)
+        ).asJava,
+        topicIds,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
+
+      val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 
1000, 10, 100, FetchIsolation.LOG_END, None.asJava)
+      val fetchOffset = 1
+
+      def fetchCallback(responseStatus: Seq[(TopicIdPartition, 
FetchPartitionData)]): Unit = {
+        assertEquals(1, responseStatus.size)
+        assertEquals(tidp0, responseStatus.toMap.keySet.head)
+        val fetchPartitionData: FetchPartitionData = 
responseStatus.toMap.get(tidp0).get
+        // should only follower fetch enter callback since consumer fetch will 
enter remoteFetch purgatory
+        assertTrue(isFromFollower)
+        assertEquals(Errors.OFFSET_MOVED_TO_TIERED_STORAGE, 
fetchPartitionData.error)
+        assertEquals(startOffset, fetchPartitionData.logStartOffset)
+        assertEquals(highHW, fetchPartitionData.highWatermark)
+      }
+
+      // when reading log, it'll throw OffsetOutOfRangeException, which will 
be handled separately
+      replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 100000, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UnboundedQuota, fetchCallback)
+
+      val remoteStorageFetchInfoArg: ArgumentCaptor[RemoteStorageFetchInfo] = 
ArgumentCaptor.forClass(classOf[RemoteStorageFetchInfo])
+      if (isFromFollower) {
+        verify(mockRemoteLogManager, 
never()).asyncRead(remoteStorageFetchInfoArg.capture(), any())
+      } else {
+        
verify(mockRemoteLogManager).asyncRead(remoteStorageFetchInfoArg.capture(), 
any())
+        val remoteStorageFetchInfo = remoteStorageFetchInfoArg.getValue
+        assertEquals(tp0, remoteStorageFetchInfo.topicPartition)
+        assertEquals(fetchOffset, remoteStorageFetchInfo.fetchInfo.fetchOffset)
+        assertEquals(topicId, remoteStorageFetchInfo.fetchInfo.topicId)
+        assertEquals(startOffset, 
remoteStorageFetchInfo.fetchInfo.logStartOffset)
+        assertEquals(leaderEpoch, 
remoteStorageFetchInfo.fetchInfo.currentLeaderEpoch.get())
+      }
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @Test
+  def testRemoteLogReaderMetrics(): Unit = {
+    val replicaId = -1
+    val tp0 = new TopicPartition(topic, 0)
+    val tidp0 = new TopicIdPartition(topicId, tp0)
+
+    val props = new Properties()
+    props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
true)
+    props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, 
classOf[NoOpRemoteStorageManager].getName)
+    
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, 
classOf[NoOpRemoteLogMetadataManager].getName)
+    // set log reader threads number to 2
+    props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, 2)
+    val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props)
+    val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
+    val mockLog = mock(classOf[UnifiedLog])
+    val brokerTopicStats = new BrokerTopicStats
+    val remoteLogManager = new RemoteLogManager(
+      remoteLogManagerConfig,
+      0,
+      TestUtils.tempRelativeDir("data").getAbsolutePath,
+      "clusterId",
+      time,
+      _ => Optional.of(mockLog),
+      brokerTopicStats)
+    val spyRLM = spy(remoteLogManager)
+
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true, remoteLogManager = Some(spyRLM))
+    try {
+      val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+      replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+      val partition0Replicas = Seq[Integer](0, 1).asJava
+      val topicIds = Map(tp0.topic -> topicId).asJava
+      val leaderEpoch = 0
+      val leaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(
+          new LeaderAndIsrPartitionState()
+            .setTopicName(tp0.topic)
+            .setPartitionIndex(tp0.partition)
+            .setControllerEpoch(0)
+            .setLeader(leaderEpoch)
+            .setLeaderEpoch(0)
+            .setIsr(partition0Replicas)
+            .setPartitionEpoch(0)
+            .setReplicas(partition0Replicas)
+            .setIsNew(true)
+        ).asJava,
+        topicIds,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
+
+      val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 
1000, 10, 100, FetchIsolation.LOG_END, None.asJava)
+      val fetchOffset = 1
+
+      def fetchCallback(responseStatus: Seq[(TopicIdPartition, 
FetchPartitionData)]): Unit = {
+        assertEquals(1, responseStatus.size)
+        assertEquals(tidp0, responseStatus.toMap.keySet.head)
+      }
+
+      assertEquals(1.0, 
yammerMetricValue("RemoteLogReaderAvgIdlePercent").asInstanceOf[Double])
+      assertEquals(0, 
yammerMetricValue("RemoteLogReaderTaskQueueSize").asInstanceOf[Int])
+
+      // our thread number is 2
+      val queueLatch = new CountDownLatch(2)
+      val doneLatch = new CountDownLatch(1)
+
+      doAnswer(_ => {
+        queueLatch.countDown()
+        // wait until verification completed
+        doneLatch.await()
+        new FetchDataInfo(new LogOffsetMetadata(startOffset), 
mock(classOf[Records]))
+      }).when(spyRLM).read(any())
+
+      // create 5 asyncRead tasks, which should enqueue 3 task
+      for (i <- 1 to 5)
+        replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 100000, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UnboundedQuota, fetchCallback)
+
+      // wait until at least 2 task submitted to use all the available threads
+      queueLatch.await()
+      // make sure tasks are all submitted
+      Thread.sleep(100)
+      // RemoteLogReader should not be all idle
+      
assertTrue(yammerMetricValue("RemoteLogReaderAvgIdlePercent").asInstanceOf[Double]
 < 1.0)
+      // RemoteLogReader should queue some tasks
+      
assertTrue(yammerMetricValue("RemoteLogReaderTaskQueueSize").asInstanceOf[Int] 
> 0)

Review Comment:
   I assume the above cannot be equal to 0.0 because of some uncertainty 
tolerance of when the metric is recorded, but why can this not be equal to 3?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to