wuchong commented on code in PR #1666:
URL: https://github.com/apache/fluss/pull/1666#discussion_r2348627118


##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java:
##########
@@ -243,18 +273,21 @@ private TableOrPartitions 
getTableOrPartitionsInFetchRequest(FetchLogRequest fet
         return new TableOrPartitions(tableIdsInFetchRequest, 
tablePartitionsInFetchRequest);
     }
 
-    private static class TableOrPartitions {
+    /** A helper class to hold table ids or table partitions. */
+    @VisibleForTesting
+    public static class TableOrPartitions {
         private final @Nullable Set<Long> tableIds;
         private final @Nullable Set<TablePartition> tablePartitions;
 
-        private TableOrPartitions(
+        TableOrPartitions(
                 @Nullable Set<Long> tableIds, @Nullable Set<TablePartition> 
tablePartitions) {
             this.tableIds = tableIds;
             this.tablePartitions = tablePartitions;
         }
     }
 
-    private void invalidTableOrPartitions(TableOrPartitions tableOrPartitions) 
{
+    @VisibleForTesting
+    public void invalidTableOrPartitions(TableOrPartitions tableOrPartitions) {

Review Comment:
   Change them to package visible, it is only accessed in the same test class. 



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java:
##########
@@ -243,18 +273,21 @@ private TableOrPartitions 
getTableOrPartitionsInFetchRequest(FetchLogRequest fet
         return new TableOrPartitions(tableIdsInFetchRequest, 
tablePartitionsInFetchRequest);
     }
 
-    private static class TableOrPartitions {
+    /** A helper class to hold table ids or table partitions. */
+    @VisibleForTesting
+    public static class TableOrPartitions {

Review Comment:
   Change them to package visible, it is only accessed in the same test class. 



##########
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java:
##########
@@ -183,6 +188,37 @@ void testFetchWhenDestinationIsNullInMetadata() throws 
Exception {
         assertThat(records.get(tb0).size()).isEqualTo(10);
     }
 
+    @Test
+    void testFetchWithInvalidTableOrPartitions() throws Exception {
+        MetadataUpdater metadataUpdater1 =
+                new MetadataUpdater(clientConf, 
FLUSS_CLUSTER_EXTENSION.getRpcClient());
+        logFetcher =
+                new LogFetcher(
+                        DATA1_TABLE_INFO,
+                        null,
+                        logScannerStatus,
+                        clientConf,
+                        metadataUpdater1,
+                        TestingScannerMetricGroup.newInstance(),
+                        new RemoteFileDownloader(1));
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        Future<?> future =
+                executor.submit(
+                        () -> {
+                            // If this test blocked, please checking whether 
it was blocked with
+                            // the same reason as 
https://github.com/apache/fluss/pull/1666
+                            for (int i = 0; i < 1000; i++) {
+                                logFetcher.sendFetches();
+                                logFetcher.invalidTableOrPartitions(
+                                        new LogFetcher.TableOrPartitions(
+                                                
Collections.singleton(tableId), null));
+                            }
+                        });
+
+        future.get(2, TimeUnit.MINUTES);

Review Comment:
   assert future is done in the end, and close the executor 



##########
fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java:
##########
@@ -63,9 +63,10 @@ public class MetadataUtils {
      * add those table into cluster.
      */
     public static Cluster sendMetadataRequestAndRebuildCluster(
-            AdminReadOnlyGateway gateway, Set<TablePath> tablePaths)
+            AdminReadOnlyGateway gateway, Set<TablePath> tablePaths, long 
metadataRequestTimeoutMs)

Review Comment:
   Please remove the timeout parameter, and use the default 30s in the test. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to