wuchong commented on code in PR #2926:
URL: https://github.com/apache/fluss/pull/2926#discussion_r2985598075
##########
fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java:
##########
@@ -374,6 +376,83 @@ tb, genMemoryLogRecordsWithWriterId(DATA1, writerId, 2,
0)),
() ->
assertThat(followerReplica.getLocalLogEndOffset()).isEqualTo(30L));
}
+ @Test
+ void testFetchTimeoutReleasesPooledByteBuf() throws Exception {
+ // This test verifies that when a fetchLog RPC times out, the pooled
ByteBuf
+ // held by the late-arriving FetchLogResponse is properly released.
+ // Without the fix, the ByteBuf would leak, causing Netty direct
memory growth.
+
+ ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
+ try {
+ Configuration conf = new Configuration();
+ ServerNode followerNode =
+ new ServerNode(
+ followerServerId,
+ "localhost",
+ 10001,
+ ServerType.TABLET_SERVER,
+ "rack2");
+ TestingLeaderEndpoint testingEndpoint =
+ new TestingLeaderEndpoint(conf, leaderRM, followerNode);
+
+ // Append records to leader so fetch responses carry actual data
+ CompletableFuture<List<ProduceLogResultForBucket>> future = new
CompletableFuture<>();
+ leaderRM.appendRecordsToLog(
+ 1000,
+ 1,
+ Collections.singletonMap(tb,
genMemoryLogRecordsByObject(DATA1)),
+ null,
+ future::complete);
+ assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 0, 10L));
+
+ // Configure the endpoint to delay responses by 3 seconds (longer
than 1s timeout)
+ testingEndpoint.setFetchDelay(scheduler, 3000);
+
+ // Create a fetcher with a very short timeout (1 second) to
trigger timeout quickly
+ ReplicaFetcherThread timeoutFetcher =
+ new ReplicaFetcherThread(
+ "test-timeout-fetcher",
+ followerRM,
+ testingEndpoint,
+ 1000,
+ 1 /* 1 second timeout */);
+
+ timeoutFetcher.addBuckets(
+ Collections.singletonMap(
+ tb,
+ new InitialFetchStatus(
+ DATA1_TABLE_ID, DATA1_TABLE_PATH,
leader.id(), 0L)));
+
+ // Start the fetcher - it will send fetches, each timing out after
1s,
+ // then the delayed responses arrive after 3s
+ timeoutFetcher.start();
+
+ // Wait for at least one timeout + delayed response cycle to
complete
+ Thread.sleep(5000);
+
+ // Shutdown the fetcher to stop new requests
+ timeoutFetcher.shutdown();
+
+ // Wait a bit more for any remaining delayed responses to arrive
and be cleaned up
+ Thread.sleep(2000);
Review Comment:
On slower CI hosts, or if the fetcher starts one more delayed request just
before `shutdown()`, these fixed `Thread.sleep(5000)` / `Thread.sleep(2000)`
calls do not guarantee that every scheduled late response has completed before
the assertions run. In that case `allBufs` can still include an in-flight
buffer with `refCnt() == 1`, so this new regression test will fail
nondeterministically even when the production fix is correct.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]