This is an automated email from the ASF dual-hosted git repository.

yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new e47748b72 [Client] Fix LogFetcher doesn't update bucket metadata when 
receive NotLeaderOrFollowerException in bucket level (#1885)
e47748b72 is described below

commit e47748b72ee72445882dc08a2c3952fe7af84660
Author: yunhong <[email protected]>
AuthorDate: Thu Dec 18 18:11:53 2025 +0800

    [Client] Fix LogFetcher doesn't update bucket metadata when receive 
NotLeaderOrFollowerException in bucket level (#1885)
---
 .../fluss/client/metadata/ClientSchemaGetter.java  |   2 +-
 .../fluss/client/table/scanner/log/LogFetcher.java |  60 ++-
 .../client/metadata/TestingClientSchemaGetter.java |  55 +++
 .../client/metadata/TestingMetadataUpdater.java    |  14 +-
 .../{LogFetcherTest.java => LogFetcherITCase.java} |   6 +-
 .../client/table/scanner/log/LogFetcherTest.java   | 402 +++++----------------
 .../server/tablet/TestTabletServerGateway.java     |  56 +--
 7 files changed, 210 insertions(+), 385 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/ClientSchemaGetter.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/ClientSchemaGetter.java
index 6b6a11d77..486f80744 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/ClientSchemaGetter.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/ClientSchemaGetter.java
@@ -42,7 +42,7 @@ public class ClientSchemaGetter implements SchemaGetter {
     private static final Logger LOG = 
LoggerFactory.getLogger(ClientSchemaGetter.class);
 
     private final TablePath tablePath;
-    private final Map<Integer, Schema> schemasById;
+    protected final Map<Integer, Schema> schemasById;
     private final Admin admin;
     private volatile SchemaInfo latestSchemaInfo;
 
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
index b19347772..a97bffac0 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
@@ -26,8 +26,10 @@ import org.apache.fluss.client.table.scanner.ScanRecord;
 import org.apache.fluss.cluster.BucketLocation;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.ApiException;
 import org.apache.fluss.exception.InvalidMetadataException;
 import org.apache.fluss.exception.LeaderNotAvailableException;
+import org.apache.fluss.exception.PartitionNotExistException;
 import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.SchemaGetter;
@@ -48,6 +50,7 @@ import org.apache.fluss.rpc.messages.PbFetchLogReqForBucket;
 import org.apache.fluss.rpc.messages.PbFetchLogReqForTable;
 import org.apache.fluss.rpc.messages.PbFetchLogRespForBucket;
 import org.apache.fluss.rpc.messages.PbFetchLogRespForTable;
+import org.apache.fluss.rpc.protocol.ApiError;
 import org.apache.fluss.rpc.protocol.Errors;
 import org.apache.fluss.utils.IOUtils;
 import org.apache.fluss.utils.Projection;
@@ -217,14 +220,27 @@ public class LogFetcher implements Closeable {
             }
         }
 
-        if (isPartitioned && !partitionIds.isEmpty()) {
-            metadataUpdater.updateMetadata(Collections.singleton(tablePath), 
null, partitionIds);
-        } else if (needUpdate) {
-            metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
+        try {
+            if (isPartitioned && !partitionIds.isEmpty()) {
+                metadataUpdater.updateMetadata(
+                        Collections.singleton(tablePath), null, partitionIds);
+            } else if (needUpdate) {
+                metadataUpdater.updateTableOrPartitionMetadata(tablePath, 
null);
+            }
+        } catch (Exception e) {
+            if (e instanceof PartitionNotExistException) {
+                // ignore this exception, this is probably happen because the 
partition is deleted.
+                // The fetcher can also work fine. The caller like flink can 
remove the partition
+                // from fetch list when receive exception.
+                LOG.warn("Receive PartitionNotExistException when update 
metadata, ignore it", e);
+            } else {
+                throw e;
+            }
         }
     }
 
-    private void sendFetchRequest(int destination, FetchLogRequest 
fetchLogRequest) {
+    @VisibleForTesting
+    void sendFetchRequest(int destination, FetchLogRequest fetchLogRequest) {
         TableOrPartitions tableOrPartitionsInFetchRequest =
                 getTableOrPartitionsInFetchRequest(fetchLogRequest);
         // TODO cache the tablet server gateway.
@@ -345,6 +361,14 @@ public class LogFetcher implements Closeable {
                                     respForBucket.getBucketId());
                     FetchLogResultForBucket fetchResultForBucket =
                             getFetchLogResultForBucket(tb, tablePath, 
respForBucket);
+
+                    // if error code is not NONE, it means the fetch log 
request failed, we need to
+                    // clear table bucket meta for InvalidMetadataException.
+                    if (fetchResultForBucket.getErrorCode() != 
Errors.NONE.code()) {
+                        ApiError error = 
ApiError.fromErrorMessage(respForBucket);
+                        handleFetchLogExceptionForBucket(tb, destination, 
error);
+                    }
+
                     Long fetchOffset = logScannerStatus.getBucketOffset(tb);
                     // if the offset is null, it means the bucket has been 
unsubscribed,
                     // we just set a Long.MAX_VALUE as the next fetch offset
@@ -387,6 +411,29 @@ public class LogFetcher implements Closeable {
         }
     }
 
+    private void handleFetchLogExceptionForBucket(TableBucket tb, int 
destination, ApiError error) {
+        ApiException exception = error.error().exception();
+        LOG.error("Failed to fetch log from node {} for bucket {}", 
destination, tb, exception);
+        if (exception instanceof InvalidMetadataException) {
+            LOG.warn(
+                    "Invalid metadata error in fetch log request. "
+                            + "Going to request metadata update.",
+                    exception);
+            long tableId = tb.getTableId();
+            TableOrPartitions tableOrPartitions;
+            if (tb.getPartitionId() == null) {
+                tableOrPartitions = new 
TableOrPartitions(Collections.singleton(tableId), null);
+            } else {
+                tableOrPartitions =
+                        new TableOrPartitions(
+                                null,
+                                Collections.singleton(
+                                        new TablePartition(tableId, 
tb.getPartitionId())));
+            }
+            invalidTableOrPartitions(tableOrPartitions);
+        }
+    }
+
     private void pendRemoteFetches(
             RemoteLogFetchInfo remoteLogFetchInfo, long firstFetchOffset, long 
highWatermark) {
         checkNotNull(remoteLogFetchInfo);
@@ -417,7 +464,8 @@ public class LogFetcher implements Closeable {
         }
     }
 
-    private Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
+    @VisibleForTesting
+    Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
         Map<Integer, List<PbFetchLogReqForBucket>> fetchLogReqForBuckets = new 
HashMap<>();
         int readyForFetchCount = 0;
         Long tableId = null;
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingClientSchemaGetter.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingClientSchemaGetter.java
new file mode 100644
index 000000000..a983eea11
--- /dev/null
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingClientSchemaGetter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.metadata;
+
+import org.apache.fluss.client.admin.FlussAdmin;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.rpc.RpcClient;
+import org.apache.fluss.rpc.metrics.TestingClientMetricGroup;
+
+import java.util.concurrent.CompletableFuture;
+
+/** Testing class for {@link ClientSchemaGetter}. */
+public class TestingClientSchemaGetter extends ClientSchemaGetter {
+    public TestingClientSchemaGetter(
+            TablePath tablePath,
+            SchemaInfo latestSchemaInfo,
+            TestingMetadataUpdater metadataUpdater) {
+        super(
+                tablePath,
+                latestSchemaInfo,
+                new FlussAdmin(
+                        RpcClient.create(
+                                new Configuration(), 
TestingClientMetricGroup.newInstance(), false),
+                        metadataUpdater));
+    }
+
+    @Override
+    public Schema getSchema(int schemaId) {
+        return schemasById.get(schemaId);
+    }
+
+    @Override
+    public CompletableFuture<SchemaInfo> getSchemaInfoAsync(int schemaId) {
+        return CompletableFuture.completedFuture(
+                new SchemaInfo(schemasById.get(schemaId), schemaId));
+    }
+}
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
index 1c8f825a9..206395135 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
@@ -42,13 +42,13 @@ import java.util.stream.Collectors;
 
 /** Testing class for metadata updater. */
 public class TestingMetadataUpdater extends MetadataUpdater {
-    private static final ServerNode COORDINATOR =
+    public static final ServerNode COORDINATOR =
             new ServerNode(0, "localhost", 90, ServerType.COORDINATOR);
-    private static final ServerNode NODE1 =
+    public static final ServerNode NODE1 =
             new ServerNode(1, "localhost", 90, ServerType.TABLET_SERVER, 
"rack1");
-    private static final ServerNode NODE2 =
+    public static final ServerNode NODE2 =
             new ServerNode(2, "localhost", 91, ServerType.TABLET_SERVER, 
"rack2");
-    private static final ServerNode NODE3 =
+    public static final ServerNode NODE3 =
             new ServerNode(3, "localhost", 92, ServerType.TABLET_SERVER, 
"rack3");
 
     private final TestCoordinatorGateway coordinatorGateway;
@@ -63,7 +63,7 @@ public class TestingMetadataUpdater extends MetadataUpdater {
                 new Configuration());
     }
 
-    private TestingMetadataUpdater(
+    public TestingMetadataUpdater(
             ServerNode coordinatorServer,
             List<ServerNode> tabletServers,
             Map<TablePath, TableInfo> tableInfos,
@@ -137,10 +137,6 @@ public class TestingMetadataUpdater extends 
MetadataUpdater {
         this.cluster = cluster;
     }
 
-    public void setResponseLogicId(int serverId, int responseLogicId) {
-        
tabletServerGatewayMap.get(serverId).setResponseLogicId(responseLogicId);
-    }
-
     @Override
     public void checkAndUpdateTableMetadata(Set<TablePath> tablePaths) {
         Set<TablePath> needUpdateTablePaths =
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java
similarity index 99%
copy from 
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
copy to 
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java
index bb36cb7f1..b6817b327 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java
@@ -67,8 +67,8 @@ import static 
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObje
 import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link LogFetcher}. */
-public class LogFetcherTest extends ClientToServerITCaseBase {
+/** IT test for {@link LogFetcher}. */
+public class LogFetcherITCase extends ClientToServerITCaseBase {
     private LogFetcher logFetcher;
     private long tableId;
     private final int bucketId0 = 0;
@@ -78,8 +78,6 @@ public class LogFetcherTest extends ClientToServerITCaseBase {
     private MetadataUpdater metadataUpdater;
     private ClientSchemaGetter clientSchemaGetter;
 
-    // TODO covert this test to UT as kafka.
-
     @BeforeEach
     protected void setup() throws Exception {
         super.setup();
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
index bb36cb7f1..f06f88614 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
@@ -17,96 +17,67 @@
 
 package org.apache.fluss.client.table.scanner.log;
 
-import org.apache.fluss.client.admin.ClientToServerITCaseBase;
-import org.apache.fluss.client.admin.FlussAdmin;
 import org.apache.fluss.client.metadata.ClientSchemaGetter;
-import org.apache.fluss.client.metadata.MetadataUpdater;
+import org.apache.fluss.client.metadata.TestingClientSchemaGetter;
+import org.apache.fluss.client.metadata.TestingMetadataUpdater;
 import org.apache.fluss.client.metrics.TestingScannerMetricGroup;
 import org.apache.fluss.client.table.scanner.RemoteFileDownloader;
-import org.apache.fluss.client.table.scanner.ScanRecord;
-import org.apache.fluss.cluster.Cluster;
-import org.apache.fluss.cluster.ServerNode;
+import org.apache.fluss.cluster.BucketLocation;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.NotLeaderOrFollowerException;
+import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.SchemaInfo;
 import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.metadata.TableChange;
-import org.apache.fluss.metadata.TableInfo;
-import org.apache.fluss.record.MemoryLogRecords;
-import org.apache.fluss.row.GenericRow;
-import org.apache.fluss.rpc.RpcClient;
-import org.apache.fluss.rpc.gateway.TabletServerGateway;
-import org.apache.fluss.rpc.messages.PbProduceLogRespForBucket;
-import org.apache.fluss.rpc.messages.ProduceLogResponse;
-import org.apache.fluss.testutils.DataTestUtils;
-import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
+import org.apache.fluss.rpc.messages.FetchLogRequest;
+import org.apache.fluss.rpc.messages.FetchLogResponse;
+import org.apache.fluss.rpc.protocol.ApiError;
+import org.apache.fluss.server.entity.FetchReqInfo;
+import org.apache.fluss.server.tablet.TestTabletServerGateway;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.time.Duration;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
-import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
-import static org.apache.fluss.record.TestData.DATA1;
+import static org.apache.fluss.client.metadata.TestingMetadataUpdater.NODE1;
+import static org.apache.fluss.client.metadata.TestingMetadataUpdater.NODE2;
+import static org.apache.fluss.client.metadata.TestingMetadataUpdater.NODE3;
 import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
-import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
-import static org.apache.fluss.record.TestData.DATA2;
-import static org.apache.fluss.record.TestData.DATA2_ROW_TYPE;
-import static org.apache.fluss.record.TestData.DATA2_SCHEMA;
-import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newProduceLogRequest;
-import static 
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
-import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
+import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.getFetchLogData;
+import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeFetchLogResponse;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link LogFetcher}. */
-public class LogFetcherTest extends ClientToServerITCaseBase {
-    private LogFetcher logFetcher;
-    private long tableId;
-    private final int bucketId0 = 0;
-    private final int bucketId1 = 1;
-    private LogScannerStatus logScannerStatus;
-    private FlussAdmin admin;
-    private MetadataUpdater metadataUpdater;
-    private ClientSchemaGetter clientSchemaGetter;
+/** UT Test for {@link LogFetcher}. */
+public class LogFetcherTest {
+    private final TableBucket tb1 = new TableBucket(DATA1_TABLE_ID, 0);
 
-    // TODO covert this test to UT as kafka.
+    private TestingMetadataUpdater metadataUpdater;
+    private LogFetcher logFetcher = null;
 
-    @BeforeEach
-    protected void setup() throws Exception {
-        super.setup();
-
-        // We create table data1NonPkTablePath previously.
-        tableId = createTable(DATA1_TABLE_PATH, DATA1_TABLE_DESCRIPTOR, false);
-        FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
+    // TODO Add more ut tests like kafka.
 
-        RpcClient rpcClient = FLUSS_CLUSTER_EXTENSION.getRpcClient();
-        metadataUpdater = new MetadataUpdater(clientConf, rpcClient);
-        
metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(DATA1_TABLE_PATH));
-
-        Map<TableBucket, Long> scanBuckets = new HashMap<>();
-        // add bucket 0 and bucket 1 to log scanner status.
-        scanBuckets.put(new TableBucket(tableId, bucketId0), 0L);
-        scanBuckets.put(new TableBucket(tableId, bucketId1), 0L);
-        logScannerStatus = new LogScannerStatus();
-        logScannerStatus.assignScanBuckets(scanBuckets);
-        admin = new FlussAdmin(rpcClient, metadataUpdater);
-        clientSchemaGetter =
-                new ClientSchemaGetter(DATA1_TABLE_PATH, new 
SchemaInfo(DATA1_SCHEMA, 1), admin);
+    @BeforeEach
+    public void setup() {
+        metadataUpdater = initializeMetadataUpdater();
+        ClientSchemaGetter clientSchemaGetter =
+                new TestingClientSchemaGetter(
+                        DATA1_TABLE_PATH, new SchemaInfo(DATA1_SCHEMA, 0), 
metadataUpdater);
+        LogScannerStatus logScannerStatus = initializeLogScannerStatus();
         logFetcher =
                 new LogFetcher(
                         DATA1_TABLE_INFO,
                         null,
                         logScannerStatus,
-                        clientConf,
+                        new Configuration(),
                         metadataUpdater,
                         TestingScannerMetricGroup.newInstance(),
                         new RemoteFileDownloader(1),
@@ -114,259 +85,68 @@ public class LogFetcherTest extends 
ClientToServerITCaseBase {
     }
 
     @Test
-    void testFetchWithSchemaChange() throws Exception {
-        // add one batch records to tb0.
-        TableBucket tb0 = new TableBucket(tableId, bucketId0);
-        addRecordsToBucket(tb0, genMemoryLogRecordsByObject(DATA1), 0L);
-
-        // add new column(which equals to DATA2_ROW_TYPE)
-        admin.alterTable(
-                        DATA1_TABLE_PATH,
-                        Collections.singletonList(
-                                TableChange.addColumn(
-                                        "c",
-                                        DataTypes.STRING(),
-                                        null,
-                                        TableChange.ColumnPosition.last())),
-                        false)
-                .get();
-        // add one batch records with new schema to tb0.
-        addRecordsToBucket(
-                tb0,
-                genMemoryLogRecordsByObject(DATA2_ROW_TYPE, 2, 
CURRENT_LOG_MAGIC_VALUE, DATA2),
-                10L);
-
-        // Read data with old schema, thus DATA2 will be truncated as DATA1
-        List<GenericRow> expectedRows =
-                
DATA1.stream().map(DataTestUtils::row).collect(Collectors.toList());
-        
expectedRows.addAll(DATA1.stream().map(DataTestUtils::row).collect(Collectors.toList()));
-        logFetcher.sendFetches();
-        // The fetcher is async to fetch data, so we need to wait the result 
write to the
-        // logFetchBuffer.
-        retry(
-                Duration.ofMinutes(1),
-                () -> {
-                    assertThat(logFetcher.hasAvailableFetches()).isTrue();
-                    
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2);
-                });
-        Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
-        assertThat(records.size()).isEqualTo(1);
-        List<ScanRecord> scanRecords = records.get(tb0);
-        
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
-                .isEqualTo(expectedRows);
-
-        // read data with new schema, thus DATA2 will be appended with null 
value.
-        expectedRows =
-                DATA1.stream()
-                        .map(row -> DataTestUtils.row(row[0], row[1], null))
-                        .collect(Collectors.toList());
-        
expectedRows.addAll(DATA2.stream().map(DataTestUtils::row).collect(Collectors.toList()));
-        logScannerStatus.assignScanBuckets(Collections.singletonMap(tb0, 0L));
-        LogFetcher newSchemaLogFetcher =
-                new LogFetcher(
-                        new TableInfo(
-                                DATA1_TABLE_INFO.getTablePath(),
-                                tableId,
-                                2,
-                                DATA2_SCHEMA,
-                                DATA1_TABLE_INFO.getBucketKeys(),
-                                DATA1_TABLE_INFO.getPartitionKeys(),
-                                DATA1_TABLE_INFO.getNumBuckets(),
-                                DATA1_TABLE_INFO.getProperties(),
-                                DATA1_TABLE_INFO.getCustomProperties(),
-                                DATA1_TABLE_INFO.getComment().orElse(null),
-                                DATA1_TABLE_INFO.getCreatedTime(),
-                                DATA1_TABLE_INFO.getModifiedTime()),
-                        null,
-                        logScannerStatus,
-                        clientConf,
-                        metadataUpdater,
-                        TestingScannerMetricGroup.newInstance(),
-                        new RemoteFileDownloader(1),
-                        clientSchemaGetter);
-        newSchemaLogFetcher.sendFetches();
-        // The fetcher is async to fetch data, so we need to wait the result 
write to the
-        // logFetchBuffer.
-        retry(
-                Duration.ofMinutes(1),
-                () -> {
-                    
assertThat(newSchemaLogFetcher.hasAvailableFetches()).isTrue();
-                    
assertThat(newSchemaLogFetcher.getCompletedFetchesSize()).isEqualTo(2);
-                });
-        records = newSchemaLogFetcher.collectFetch();
-        assertThat(records.size()).isEqualTo(1);
-        assertThat(records.get(tb0)).hasSize(20);
-        scanRecords = records.get(tb0);
-        
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
-                .isEqualTo(expectedRows);
-        newSchemaLogFetcher.close();
-    }
-
-    @Test
-    void testFetch() throws Exception {
-        // add one batch records to tb0.
-        TableBucket tb0 = new TableBucket(tableId, bucketId0);
-        addRecordsToBucket(tb0, genMemoryLogRecordsByObject(DATA1), 0L);
-
-        // add one batch records to tb1.
-        TableBucket tb1 = new TableBucket(tableId, bucketId1);
-        addRecordsToBucket(tb1, genMemoryLogRecordsByObject(DATA1), 0L);
-
-        assertThat(logFetcher.hasAvailableFetches()).isFalse();
-        // collect fetch will be empty while no available fetch.
-        assertThat(logFetcher.collectFetch()).isEmpty();
-
-        // send fetcher to fetch data.
-        logFetcher.sendFetches();
-        // The fetcher is async to fetch data, so we need to wait the result 
write to the
-        // logFetchBuffer.
-        retry(
-                Duration.ofMinutes(1),
-                () -> {
-                    assertThat(logFetcher.hasAvailableFetches()).isTrue();
-                    
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2);
-                });
-
-        Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
-        assertThat(records.size()).isEqualTo(2);
-        assertThat(records.get(tb0).size()).isEqualTo(10);
-        assertThat(records.get(tb1).size()).isEqualTo(10);
-
-        // after collect fetch, the fetcher is empty.
-        assertThat(logFetcher.hasAvailableFetches()).isFalse();
-        assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(0);
-    }
-
-    @Test
-    void testFetchWhenDestinationIsNullInMetadata() throws Exception {
-        TableBucket tb0 = new TableBucket(tableId, bucketId0);
-        addRecordsToBucket(tb0, genMemoryLogRecordsByObject(DATA1), 0L);
-
-        RpcClient rpcClient = FLUSS_CLUSTER_EXTENSION.getRpcClient();
-        MetadataUpdater metadataUpdater = new MetadataUpdater(clientConf, 
rpcClient);
-        
metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(DATA1_TABLE_PATH));
-
-        int leaderNode = metadataUpdater.leaderFor(DATA1_TABLE_PATH, tb0);
-
-        // now, remove leader nodd ,so that fetch destination
-        // server node is null
-        Cluster oldCluster = metadataUpdater.getCluster();
-        Map<Integer, ServerNode> aliveTabletServersById =
-                new HashMap<>(oldCluster.getAliveTabletServers());
-        aliveTabletServersById.remove(leaderNode);
-        Cluster newCluster =
-                new Cluster(
-                        aliveTabletServersById,
-                        oldCluster.getCoordinatorServer(),
-                        oldCluster.getBucketLocationsByPath(),
-                        oldCluster.getTableIdByPath(),
-                        oldCluster.getPartitionIdByPath());
-        metadataUpdater = new MetadataUpdater(rpcClient, clientConf, 
newCluster);
-
-        LogScannerStatus logScannerStatus = new LogScannerStatus();
-        logScannerStatus.assignScanBuckets(Collections.singletonMap(tb0, 0L));
-
-        ClientSchemaGetter clientSchemaGetter =
-                new ClientSchemaGetter(
-                        DATA1_TABLE_PATH,
-                        new SchemaInfo(DATA1_SCHEMA, 1),
-                        new FlussAdmin(rpcClient, metadataUpdater));
-
-        LogFetcher logFetcher =
-                new LogFetcher(
-                        DATA1_TABLE_INFO,
-                        null,
-                        logScannerStatus,
-                        clientConf,
-                        metadataUpdater,
-                        TestingScannerMetricGroup.newInstance(),
-                        new RemoteFileDownloader(1),
-                        clientSchemaGetter);
-
-        // send fetches to fetch data, should have no available fetch.
-        logFetcher.sendFetches();
-        assertThat(logFetcher.hasAvailableFetches()).isFalse();
-
-        // then fetches again, should have available fetch.
-        // first send fetch is for update metadata
-        logFetcher.sendFetches();
-        // second send fetch will do real fetch data
-        logFetcher.sendFetches();
-        retry(
-                Duration.ofMinutes(1),
-                () -> {
-                    assertThat(logFetcher.hasAvailableFetches()).isTrue();
-                    
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(1);
-                });
-        Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
-        assertThat(records.size()).isEqualTo(1);
-        assertThat(records.get(tb0).size()).isEqualTo(10);
+    void sendFetchRequestWithNotLeaderOrFollowerException() {
+        Map<Integer, FetchLogRequest> requestMap = 
logFetcher.prepareFetchLogRequests();
+        Set<Integer> serverSet = requestMap.keySet();
+        assertThat(serverSet).containsExactlyInAnyOrder(1);
+
+        assertThat(metadataUpdater.getBucketLocation(tb1))
+                .hasValue(
+                        new BucketLocation(
+                                PhysicalTablePath.of(DATA1_TABLE_PATH),
+                                tb1,
+                                1,
+                                new int[] {1, 2, 3}));
+
+        // send fetchLogRequest to serverId 1, which will respond with 
NotLeaderOrFollowerException
+        // as responseLogicId=1 do.
+        logFetcher.sendFetchRequest(1, requestMap.get(1));
+
+        // When NotLeaderOrFollowerException is received, the bucketLocation 
will be removed from
+        // metadata updater to trigger get the latest bucketLocation in next 
fetch round.
+        assertThat(metadataUpdater.getBucketLocation(tb1)).isNotPresent();
     }
 
-    @Test
-    void testFetchWithInvalidTableOrPartitions() throws Exception {
-        MetadataUpdater metadataUpdater1 =
-                new MetadataUpdater(clientConf, 
FLUSS_CLUSTER_EXTENSION.getRpcClient());
-        ClientSchemaGetter clientSchemaGetter =
-                new ClientSchemaGetter(
-                        DATA1_TABLE_PATH,
-                        new SchemaInfo(DATA1_SCHEMA, 1),
-                        new FlussAdmin(FLUSS_CLUSTER_EXTENSION.getRpcClient(), 
metadataUpdater1));
-        logFetcher =
-                new LogFetcher(
-                        DATA1_TABLE_INFO,
-                        null,
-                        logScannerStatus,
-                        clientConf,
-                        metadataUpdater1,
-                        TestingScannerMetricGroup.newInstance(),
-                        new RemoteFileDownloader(1),
-                        clientSchemaGetter);
-
-        ExecutorService executor = Executors.newSingleThreadExecutor();
-        Future<?> future =
-                executor.submit(
-                        () -> {
-                            // If this test blocked, please checking whether 
it was blocked with
-                            // the same reason as 
https://github.com/apache/fluss/pull/1666
-                            for (int i = 0; i < 1000; i++) {
-                                logFetcher.sendFetches();
-                                logFetcher.invalidTableOrPartitions(
-                                        new LogFetcher.TableOrPartitions(
-                                                
Collections.singleton(tableId), null));
-                            }
-                        });
+    private TestingMetadataUpdater initializeMetadataUpdater() {
 
-        future.get(30, TimeUnit.SECONDS);
-        assertThat(future.isDone()).isTrue();
-        executor.shutdownNow();
+        return new TestingMetadataUpdater(
+                TestingMetadataUpdater.COORDINATOR,
+                Arrays.asList(NODE1, NODE2, NODE3),
+                Collections.singletonMap(DATA1_TABLE_PATH, DATA1_TABLE_INFO),
+                Collections.singletonMap(1, new TestingTabletServerGateway()),
+                new Configuration());
     }
 
-    private void addRecordsToBucket(
-            TableBucket tableBucket, MemoryLogRecords logRecords, long 
expectedBaseOffset)
-            throws Exception {
-        int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tableBucket);
-        TabletServerGateway leaderGateWay =
-                FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);
-        assertProduceLogResponse(
-                leaderGateWay
-                        .produceLog(
-                                newProduceLogRequest(
-                                        tableBucket.getTableId(),
-                                        tableBucket.getBucket(),
-                                        -1, // need ack, so we can make sure 
every batch is acked.
-                                        logRecords))
-                        .get(),
-                tableBucket.getBucket(),
-                expectedBaseOffset);
+    private LogScannerStatus initializeLogScannerStatus() {
+        Map<TableBucket, Long> scanBucketAndOffsets = new HashMap<>();
+        scanBucketAndOffsets.put(tb1, 0L);
+        LogScannerStatus status = new LogScannerStatus();
+        status.assignScanBuckets(scanBucketAndOffsets);
+        return status;
     }
 
-    private static void assertProduceLogResponse(
-            ProduceLogResponse produceLogResponse, int bucketId, Long 
baseOffset) {
-        assertThat(produceLogResponse.getBucketsRespsCount()).isEqualTo(1);
-        PbProduceLogRespForBucket produceLogRespForBucket =
-                produceLogResponse.getBucketsRespsList().get(0);
-        assertThat(produceLogRespForBucket.getBucketId()).isEqualTo(bucketId);
-        
assertThat(produceLogRespForBucket.getBaseOffset()).isEqualTo(baseOffset);
+    private static class TestingTabletServerGateway extends 
TestTabletServerGateway {
+
+        public TestingTabletServerGateway() {
+            super(false, Collections.emptySet());
+        }
+
+        @Override
+        public CompletableFuture<FetchLogResponse> fetchLog(FetchLogRequest 
request) {
+            Map<TableBucket, FetchReqInfo> fetchLogData = 
getFetchLogData(request);
+            Map<TableBucket, FetchLogResultForBucket> resultForBucketMap = new 
HashMap<>();
+            // return with NotLeaderOrFollowerException.
+            fetchLogData.forEach(
+                    (tableBucket, fetchData) -> {
+                        FetchLogResultForBucket fetchLogResultForBucket =
+                                new FetchLogResultForBucket(
+                                        tableBucket,
+                                        ApiError.fromThrowable(
+                                                new 
NotLeaderOrFollowerException(
+                                                        "mock fetchLog fail 
for not leader or follower exception.")));
+                        resultForBucketMap.put(tableBucket, 
fetchLogResultForBucket);
+                    });
+            return 
CompletableFuture.completedFuture(makeFetchLogResponse(resultForBucketMap));
+        }
     }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
index 5750404b5..8ebcc5748 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
@@ -17,12 +17,9 @@
 
 package org.apache.fluss.server.tablet;
 
-import org.apache.fluss.exception.NotLeaderOrFollowerException;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.record.MemoryLogRecords;
 import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
-import org.apache.fluss.rpc.entity.LookupResultForBucket;
-import org.apache.fluss.rpc.entity.PrefixLookupResultForBucket;
 import org.apache.fluss.rpc.gateway.TabletServerGateway;
 import org.apache.fluss.rpc.messages.ApiMessage;
 import org.apache.fluss.rpc.messages.ApiVersionsRequest;
@@ -90,7 +87,6 @@ import org.apache.fluss.rpc.messages.TableExistsRequest;
 import org.apache.fluss.rpc.messages.TableExistsResponse;
 import org.apache.fluss.rpc.messages.UpdateMetadataRequest;
 import org.apache.fluss.rpc.messages.UpdateMetadataResponse;
-import org.apache.fluss.rpc.protocol.ApiError;
 import org.apache.fluss.rpc.protocol.ApiKeys;
 import org.apache.fluss.server.entity.FetchReqInfo;
 import org.apache.fluss.utils.types.Tuple2;
@@ -109,10 +105,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.getFetchLogData;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeFetchLogResponse;
-import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeLookupResponse;
-import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makePrefixLookupResponse;
-import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toLookupData;
-import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.toPrefixLookupData;
 
 /** A {@link TabletServerGateway} for test purpose. */
 public class TestTabletServerGateway implements TabletServerGateway {
@@ -120,9 +112,6 @@ public class TestTabletServerGateway implements 
TabletServerGateway {
     private final boolean alwaysFail;
     private final AtomicLong writerId = new AtomicLong(0);
 
-    /** The id to define the response logic. */
-    private int responseLogicId;
-
     // Use concurrent queue for storing request and related completable future 
response so that
     // requests may be queried from a different thread.
     private final ConcurrentLinkedDeque<Tuple2<ApiMessage, 
CompletableFuture<?>>> requests =
@@ -131,7 +120,6 @@ public class TestTabletServerGateway implements 
TabletServerGateway {
 
     public TestTabletServerGateway(boolean alwaysFail, Set<ApiKeys> 
ignoreApiKeys) {
         this.alwaysFail = alwaysFail;
-        this.responseLogicId = 0;
         this.ignoreApiKeys = ignoreApiKeys;
     }
 
@@ -201,48 +189,12 @@ public class TestTabletServerGateway implements 
TabletServerGateway {
 
     @Override
     public CompletableFuture<LookupResponse> lookup(LookupRequest request) {
-        Map<TableBucket, List<byte[]>> lookupData = toLookupData(request);
-        Map<TableBucket, LookupResultForBucket> errorResponseMap = new 
HashMap<>();
-        if (responseLogicId == 1) {
-            // return with NotLeaderOrFollowerException.
-            lookupData
-                    .keySet()
-                    .forEach(
-                            tb ->
-                                    errorResponseMap.put(
-                                            tb,
-                                            new LookupResultForBucket(
-                                                    tb,
-                                                    ApiError.fromThrowable(
-                                                            new 
NotLeaderOrFollowerException(
-                                                                    "mock not 
leader or follower exception.")))));
-            return 
CompletableFuture.completedFuture(makeLookupResponse(errorResponseMap));
-        } else {
-            return null;
-        }
+        return null;
     }
 
     @Override
     public CompletableFuture<PrefixLookupResponse> 
prefixLookup(PrefixLookupRequest request) {
-        Map<TableBucket, List<byte[]>> prefixLookupData = 
toPrefixLookupData(request);
-        Map<TableBucket, PrefixLookupResultForBucket> errorResponseMap = new 
HashMap<>();
-        if (responseLogicId == 1) {
-            // return with NotLeaderOrFollowerException.
-            prefixLookupData
-                    .keySet()
-                    .forEach(
-                            tb ->
-                                    errorResponseMap.put(
-                                            tb,
-                                            new PrefixLookupResultForBucket(
-                                                    tb,
-                                                    ApiError.fromThrowable(
-                                                            new 
NotLeaderOrFollowerException(
-                                                                    "mock not 
leader or follower exception.")))));
-            return 
CompletableFuture.completedFuture(makePrefixLookupResponse(errorResponseMap));
-        } else {
-            throw new UnsupportedOperationException();
-        }
+        return null;
     }
 
     @Override
@@ -442,10 +394,6 @@ public class TestTabletServerGateway implements 
TabletServerGateway {
         }
     }
 
-    public void setResponseLogicId(int responseLogicId) {
-        this.responseLogicId = responseLogicId;
-    }
-
     private StopReplicaResponse mockStopReplicaResponse(
             StopReplicaRequest stopReplicaRequest,
             @Nullable Integer errCode,


Reply via email to