This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-0.9
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 3015f6b5606fcd8e9353d1d95242a880a9e8685c
Author: Hongshun Wang <[email protected]>
AuthorDate: Wed Feb 11 16:32:18 2026 +0800

    [client] Thrown exception if ServerType mismatch. (#2606)
---
 .../fluss/client/metadata/MetadataUpdater.java     |  5 +-
 .../fluss/client/write/IdempotenceManager.java     | 10 +++-
 .../apache/fluss/client/write/WriterClient.java    |  6 ++-
 .../fluss/client/write/RecordAccumulatorTest.java  |  3 +-
 .../org/apache/fluss/client/write/SenderTest.java  |  6 ++-
 .../java/org/apache/fluss/cluster/ServerType.java  | 27 ++++++++++-
 .../InvalidServerTypeException.java}               | 16 +++++--
 .../fluss/rpc/netty/client/ServerConnection.java   | 23 +++++++++
 fluss-rpc/src/main/proto/FlussApi.proto            |  1 +
 .../apache/fluss/rpc/TestingGatewayService.java    |  1 +
 .../fluss/rpc/TestingTabletGatewayService.java     |  3 +-
 .../rpc/netty/authenticate/AuthenticationTest.java |  4 +-
 .../authenticate/SaslAuthenticationITCase.java     |  2 +-
 .../rpc/netty/client/ServerConnectionTest.java     | 55 ++++++++++++++++++++++
 .../org/apache/fluss/server/RpcServiceBase.java    |  1 +
 15 files changed, 142 insertions(+), 21 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
index 5cb714044..e33a78344 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
@@ -307,10 +307,7 @@ public class MetadataUpdater {
             try {
                 serverNode =
                         new ServerNode(
-                                -1,
-                                address.getHostString(),
-                                address.getPort(),
-                                ServerType.COORDINATOR);
+                                -1, address.getHostString(), 
address.getPort(), ServerType.UNKNOWN);
                 ServerNode finalServerNode = serverNode;
                 AdminReadOnlyGateway adminReadOnlyGateway =
                         GatewayClientProxy.createGatewayProxy(
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
index 5593c1999..7015b4e9a 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
@@ -19,7 +19,9 @@ package org.apache.fluss.client.write;
 
 import org.apache.fluss.annotation.Internal;
 import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.client.metadata.MetadataUpdater;
 import org.apache.fluss.exception.AuthorizationException;
+import org.apache.fluss.exception.InvalidMetadataException;
 import org.apache.fluss.exception.OutOfOrderSequenceException;
 import org.apache.fluss.exception.UnknownWriterIdException;
 import org.apache.fluss.metadata.PhysicalTablePath;
@@ -63,18 +65,21 @@ public class IdempotenceManager {
     private final IdempotenceBucketMap idempotenceBucketMap;
     private final int maxInflightRequestsPerBucket;
     private final TabletServerGateway tabletServerGateway;
+    private final MetadataUpdater metadataUpdater;
 
     private volatile long writerId;
 
     public IdempotenceManager(
             boolean idempotenceEnabled,
             int maxInflightRequestsPerBucket,
-            TabletServerGateway tabletServerGateway) {
+            TabletServerGateway tabletServerGateway,
+            MetadataUpdater metadataUpdater) {
         this.idempotenceEnabled = idempotenceEnabled;
         this.maxInflightRequestsPerBucket = maxInflightRequestsPerBucket;
         this.idempotenceBucketMap = new IdempotenceBucketMap();
         this.tabletServerGateway = tabletServerGateway;
         this.writerId = NO_WRITER_ID;
+        this.metadataUpdater = metadataUpdater;
     }
 
     boolean idempotenceEnabled() {
@@ -307,6 +312,9 @@ public class IdempotenceManager {
                 if (t instanceof AuthorizationException || retryCount >= 
RETRY_TIMES) {
                     throw t;
                 } else {
+                    if (t instanceof InvalidMetadataException) {
+                        
metadataUpdater.updatePhysicalTableMetadata(tablePaths);
+                    }
                     LOG.warn(
                             "Failed to init writer id, the retry count: {}, 
error message: {}",
                             retryCount,
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
index 788d385a1..0e184f09f 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
@@ -259,8 +259,10 @@ public class WriterClient {
 
         TabletServerGateway tabletServerGateway = 
metadataUpdater.newRandomTabletServerClient();
         return idempotenceEnabled
-                ? new IdempotenceManager(true, maxInflightRequestPerBucket, 
tabletServerGateway)
-                : new IdempotenceManager(false, maxInflightRequestPerBucket, 
tabletServerGateway);
+                ? new IdempotenceManager(
+                        true, maxInflightRequestPerBucket, 
tabletServerGateway, metadataUpdater)
+                : new IdempotenceManager(
+                        false, maxInflightRequestPerBucket, 
tabletServerGateway, metadataUpdater);
     }
 
     private short configureAcks(boolean idempotenceEnabled) {
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
index d3f772f38..746d251d8 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
@@ -634,7 +634,8 @@ class RecordAccumulatorTest {
                                 () -> cluster.getRandomTabletServer(),
                                 RpcClient.create(
                                         conf, 
TestingClientMetricGroup.newInstance(), false),
-                                TabletServerGateway.class)),
+                                TabletServerGateway.class),
+                        null),
                 TestingWriterMetricGroup.newInstance(),
                 clock);
     }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java 
b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
index c32b3f34e..16ba2f2ec 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
@@ -124,7 +124,8 @@ final class SenderTest {
                         new IdempotenceManager(
                                 false,
                                 MAX_INFLIGHT_REQUEST_PER_BUCKET,
-                                metadataUpdater.newRandomTabletServerClient()),
+                                metadataUpdater.newRandomTabletServerClient(),
+                                metadataUpdater),
                         maxRetries,
                         0);
         // do a successful retry.
@@ -855,7 +856,8 @@ final class SenderTest {
         return new IdempotenceManager(
                 idempotenceEnabled,
                 MAX_INFLIGHT_REQUEST_PER_BUCKET,
-                metadataUpdater.newRandomTabletServerClient());
+                metadataUpdater.newRandomTabletServerClient(),
+                metadataUpdater);
     }
 
     private static boolean hasIdempotentRecords(TableBucket tb, 
ProduceLogRequest request) {
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java 
b/fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java
index 0243a5bb6..31d8a9d27 100644
--- a/fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java
+++ b/fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java
@@ -19,6 +19,29 @@ package org.apache.fluss.cluster;
 
 /** The type of server in Fluss cluster. */
 public enum ServerType {
-    COORDINATOR,
-    TABLET_SERVER
+    COORDINATOR(1),
+    TABLET_SERVER(2),
+    UNKNOWN(-1);
+
+    private final int typeId;
+
+    ServerType(int typeId) {
+        this.typeId = typeId;
+    }
+
+    /** Get the ServerType from its typeId. */
+    public static ServerType fromTypeId(int typeId) {
+        if (typeId == COORDINATOR.typeId) {
+            return COORDINATOR;
+        } else if (typeId == TABLET_SERVER.typeId) {
+            return TABLET_SERVER;
+        } else {
+            return UNKNOWN;
+        }
+    }
+
+    /** Get the typeId of this ServerType. */
+    public int toTypeId() {
+        return typeId;
+    }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java 
b/fluss-common/src/main/java/org/apache/fluss/exception/InvalidServerTypeException.java
similarity index 66%
copy from fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java
copy to 
fluss-common/src/main/java/org/apache/fluss/exception/InvalidServerTypeException.java
index 0243a5bb6..4fa1f1f8a 100644
--- a/fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/exception/InvalidServerTypeException.java
@@ -15,10 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.cluster;
+package org.apache.fluss.exception;
 
-/** The type of server in Fluss cluster. */
-public enum ServerType {
-    COORDINATOR,
-    TABLET_SERVER
+import org.apache.fluss.annotation.PublicEvolving;
+
+/** Exception thrown when a request is sent to a server of an invalid type. 
since: 0.9 */
+@PublicEvolving
+public class InvalidServerTypeException extends InvalidMetadataException {
+    private static final long serialVersionUID = 1L;
+
+    public InvalidServerTypeException(String message) {
+        super(message);
+    }
 }
diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
index 2b52d87f6..f2e62c68a 100644
--- 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
+++ 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
@@ -19,8 +19,10 @@ package org.apache.fluss.rpc.netty.client;
 
 import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.cluster.ServerNode;
+import org.apache.fluss.cluster.ServerType;
 import org.apache.fluss.exception.DisconnectException;
 import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.InvalidServerTypeException;
 import org.apache.fluss.exception.NetworkException;
 import org.apache.fluss.exception.RetriableAuthenticationException;
 import org.apache.fluss.rpc.messages.ApiMessage;
@@ -372,6 +374,27 @@ final class ServerConnection {
             return;
         }
 
+        ApiVersionsResponse apiVersion = (ApiVersionsResponse) response;
+        if (apiVersion.hasServerType()) {
+            ServerType serverType = 
ServerType.fromTypeId(apiVersion.getServerType());
+            // bootstrap servers are set to unknown type, because they may 
coordinator or tablet.
+            if (node.serverType() != ServerType.UNKNOWN && serverType != 
node.serverType()) {
+                LOG.warn(
+                        "Server type mismatch, expected: {}, actual: {}",
+                        node.serverType(),
+                        serverType);
+                close(
+                        new InvalidServerTypeException(
+                                "Server type mismatch, expected: "
+                                        + node.serverType()
+                                        + ", actual: "
+                                        + serverType
+                                        + ", node: "
+                                        + node));
+                return;
+            }
+        }
+
         synchronized (lock) {
             serverApiVersions =
                     new ServerApiVersions(((ApiVersionsResponse) 
response).getApiVersionsList());
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto 
b/fluss-rpc/src/main/proto/FlussApi.proto
index 8ea29f9b7..66f09f0dc 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -40,6 +40,7 @@ message ApiVersionsRequest {
 
 message ApiVersionsResponse {
   repeated PbApiVersion api_versions = 1;
+  optional int32 server_type = 2;
 }
 
 // get schema request and response
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingGatewayService.java 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingGatewayService.java
index db516f784..136ed2153 100644
--- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingGatewayService.java
+++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingGatewayService.java
@@ -54,6 +54,7 @@ public class TestingGatewayService extends RpcGatewayService {
         }
         ApiVersionsResponse response = new ApiVersionsResponse();
         response.addAllApiVersions(apiVersions);
+        response.setServerType(providerType().toTypeId());
         processorThreadNames.add(Thread.currentThread().getName());
         return CompletableFuture.completedFuture(response);
     }
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
index 405e97114..7773ea5da 100644
--- 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
+++ 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
@@ -83,9 +83,10 @@ import java.util.concurrent.CompletableFuture;
 /** A testing implementation of the {@link TabletServerGateway} interface. */
 public class TestingTabletGatewayService extends TestingGatewayService
         implements TabletServerGateway {
+
     @Override
     public ServerType providerType() {
-        return null;
+        return ServerType.TABLET_SERVER;
     }
 
     @Override
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java
 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java
index d9e2d9cdf..8ad62d94f 100644
--- 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java
+++ 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java
@@ -250,10 +250,10 @@ public class AuthenticationTest {
             // use client listener to connect to server
             mutualAuthServerNode =
                     new ServerNode(
-                            1, "localhost", availablePort1.getPort(), 
ServerType.COORDINATOR);
+                            1, "localhost", availablePort1.getPort(), 
ServerType.TABLET_SERVER);
             usernamePasswordServerNode =
                     new ServerNode(
-                            2, "localhost", availablePort2.getPort(), 
ServerType.COORDINATOR);
+                            2, "localhost", availablePort2.getPort(), 
ServerType.TABLET_SERVER);
         }
     }
 
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java
 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java
index 25e00b9f9..a96a826aa 100644
--- 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java
+++ 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java
@@ -204,7 +204,7 @@ public class SaslAuthenticationITCase {
             // use client listener to connect to server
             ServerNode serverNode =
                     new ServerNode(
-                            1, "localhost", availablePort1.getPort(), 
ServerType.COORDINATOR);
+                            1, "localhost", availablePort1.getPort(), 
ServerType.TABLET_SERVER);
             try (NettyClient nettyClient =
                     new NettyClient(clientConfig, 
TestingClientMetricGroup.newInstance(), false)) {
                 ListTablesRequest request =
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
index 554926ac4..41e4bef2c 100644
--- 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
+++ 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
@@ -22,6 +22,7 @@ import org.apache.fluss.cluster.ServerNode;
 import org.apache.fluss.cluster.ServerType;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.DisconnectException;
+import org.apache.fluss.exception.InvalidServerTypeException;
 import org.apache.fluss.metrics.Gauge;
 import org.apache.fluss.metrics.Metric;
 import org.apache.fluss.metrics.MetricType;
@@ -31,7 +32,9 @@ import org.apache.fluss.metrics.registry.NOPMetricRegistry;
 import org.apache.fluss.metrics.util.NOPMetricsGroup;
 import org.apache.fluss.rpc.TestingGatewayService;
 import org.apache.fluss.rpc.TestingTabletGatewayService;
+import org.apache.fluss.rpc.messages.ApiMessage;
 import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
+import org.apache.fluss.rpc.messages.ListDatabasesRequest;
 import org.apache.fluss.rpc.messages.LookupRequest;
 import org.apache.fluss.rpc.messages.PbLookupReqForBucket;
 import org.apache.fluss.rpc.messages.PbTablePath;
@@ -44,6 +47,7 @@ import org.apache.fluss.rpc.protocol.ApiKeys;
 import org.apache.fluss.security.auth.AuthenticationFactory;
 import org.apache.fluss.security.auth.ClientAuthenticator;
 import org.apache.fluss.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.fluss.shaded.netty4.io.netty.channel.EventLoopGroup;
 import org.apache.fluss.utils.NetUtils;
 
@@ -55,7 +59,9 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_IN_RATE_AVG;
 import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_IN_RATE_TOTAL;
@@ -196,6 +202,55 @@ public class ServerConnectionTest {
         connection.close().get();
     }
 
+    @Test
+    void testWrongServerType() {
+        ServerNode wrongServerTypeNode =
+                new ServerNode(
+                        serverNode.id(),
+                        serverNode.host(),
+                        serverNode.port(),
+                        ServerType.COORDINATOR);
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        Bootstrap mockBootstrap =
+                new Bootstrap() {
+                    @Override
+                    public ChannelFuture connect(String host, int port) {
+                        return bootstrap
+                                .connect(host, port)
+                                .addListener(f -> countDownLatch.await(1, 
TimeUnit.MINUTES));
+                    }
+                };
+        ServerConnection connection =
+                new ServerConnection(
+                        mockBootstrap,
+                        wrongServerTypeNode,
+                        TestingClientMetricGroup.newInstance(),
+                        clientAuthenticator,
+                        (con, ignore) -> {},
+                        false);
+
+        // Pending request will be rejected with InvalidServerTypeException 
which is
+        // InvalidRequestException.
+        CompletableFuture<ApiMessage> future =
+                connection.send(ApiKeys.LIST_DATABASES, new 
ListDatabasesRequest());
+        assertThat(future).isNotDone();
+        countDownLatch.countDown();
+        assertThatThrownBy(future::get)
+                .rootCause()
+                .isInstanceOf(InvalidServerTypeException.class)
+                .hasMessageContaining("Server type mismatch");
+
+        // Later request will be rejected with DisconnectException which is 
also
+        // InvalidRequestException.
+        assertThatThrownBy(
+                        () ->
+                                connection
+                                        .send(ApiKeys.LIST_DATABASES, new 
ListDatabasesRequest())
+                                        .get())
+                .rootCause()
+                .isInstanceOf(DisconnectException.class);
+    }
+
     private void buildNettyServer() throws Exception {
         try (NetUtils.Port availablePort = getAvailablePort();
                 NetUtils.Port availablePort2 = getAvailablePort()) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java 
b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
index afb7e71b5..2946fb93b 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
@@ -198,6 +198,7 @@ public abstract class RpcServiceBase extends 
RpcGatewayService implements AdminR
         }
         ApiVersionsResponse response = new ApiVersionsResponse();
         response.addAllApiVersions(apiVersions);
+        response.setServerType(provider.toTypeId());
         return CompletableFuture.completedFuture(response);
     }
 

Reply via email to