This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-0.9 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 3015f6b5606fcd8e9353d1d95242a880a9e8685c Author: Hongshun Wang <[email protected]> AuthorDate: Wed Feb 11 16:32:18 2026 +0800 [client] Thrown exception if ServerType mismatch. (#2606) --- .../fluss/client/metadata/MetadataUpdater.java | 5 +- .../fluss/client/write/IdempotenceManager.java | 10 +++- .../apache/fluss/client/write/WriterClient.java | 6 ++- .../fluss/client/write/RecordAccumulatorTest.java | 3 +- .../org/apache/fluss/client/write/SenderTest.java | 6 ++- .../java/org/apache/fluss/cluster/ServerType.java | 27 ++++++++++- .../InvalidServerTypeException.java} | 16 +++++-- .../fluss/rpc/netty/client/ServerConnection.java | 23 +++++++++ fluss-rpc/src/main/proto/FlussApi.proto | 1 + .../apache/fluss/rpc/TestingGatewayService.java | 1 + .../fluss/rpc/TestingTabletGatewayService.java | 3 +- .../rpc/netty/authenticate/AuthenticationTest.java | 4 +- .../authenticate/SaslAuthenticationITCase.java | 2 +- .../rpc/netty/client/ServerConnectionTest.java | 55 ++++++++++++++++++++++ .../org/apache/fluss/server/RpcServiceBase.java | 1 + 15 files changed, 142 insertions(+), 21 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java index 5cb714044..e33a78344 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java @@ -307,10 +307,7 @@ public class MetadataUpdater { try { serverNode = new ServerNode( - -1, - address.getHostString(), - address.getPort(), - ServerType.COORDINATOR); + -1, address.getHostString(), address.getPort(), ServerType.UNKNOWN); ServerNode finalServerNode = serverNode; AdminReadOnlyGateway adminReadOnlyGateway = GatewayClientProxy.createGatewayProxy( diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java index 5593c1999..7015b4e9a 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java @@ -19,7 +19,9 @@ package org.apache.fluss.client.write; import org.apache.fluss.annotation.Internal; import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.exception.AuthorizationException; +import org.apache.fluss.exception.InvalidMetadataException; import org.apache.fluss.exception.OutOfOrderSequenceException; import org.apache.fluss.exception.UnknownWriterIdException; import org.apache.fluss.metadata.PhysicalTablePath; @@ -63,18 +65,21 @@ public class IdempotenceManager { private final IdempotenceBucketMap idempotenceBucketMap; private final int maxInflightRequestsPerBucket; private final TabletServerGateway tabletServerGateway; + private final MetadataUpdater metadataUpdater; private volatile long writerId; public IdempotenceManager( boolean idempotenceEnabled, int maxInflightRequestsPerBucket, - TabletServerGateway tabletServerGateway) { + TabletServerGateway tabletServerGateway, + MetadataUpdater metadataUpdater) { this.idempotenceEnabled = idempotenceEnabled; this.maxInflightRequestsPerBucket = maxInflightRequestsPerBucket; this.idempotenceBucketMap = new IdempotenceBucketMap(); this.tabletServerGateway = tabletServerGateway; this.writerId = NO_WRITER_ID; + this.metadataUpdater = metadataUpdater; } boolean idempotenceEnabled() { @@ -307,6 +312,9 @@ public class IdempotenceManager { if (t instanceof AuthorizationException || retryCount >= RETRY_TIMES) { throw t; } else { + if (t instanceof InvalidMetadataException) { + metadataUpdater.updatePhysicalTableMetadata(tablePaths); + } LOG.warn( "Failed to init writer id, the retry count: {}, error message: {}", retryCount, diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java index 788d385a1..0e184f09f 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java @@ -259,8 +259,10 @@ public class WriterClient { TabletServerGateway tabletServerGateway = metadataUpdater.newRandomTabletServerClient(); return idempotenceEnabled - ? new IdempotenceManager(true, maxInflightRequestPerBucket, tabletServerGateway) - : new IdempotenceManager(false, maxInflightRequestPerBucket, tabletServerGateway); + ? new IdempotenceManager( + true, maxInflightRequestPerBucket, tabletServerGateway, metadataUpdater) + : new IdempotenceManager( + false, maxInflightRequestPerBucket, tabletServerGateway, metadataUpdater); } private short configureAcks(boolean idempotenceEnabled) { diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java index d3f772f38..746d251d8 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java @@ -634,7 +634,8 @@ class RecordAccumulatorTest { () -> cluster.getRandomTabletServer(), RpcClient.create( conf, TestingClientMetricGroup.newInstance(), false), - TabletServerGateway.class)), + TabletServerGateway.class), + null), TestingWriterMetricGroup.newInstance(), clock); } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java index c32b3f34e..16ba2f2ec 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java @@ -124,7 +124,8 @@ final class SenderTest { new IdempotenceManager( false, MAX_INFLIGHT_REQUEST_PER_BUCKET, - metadataUpdater.newRandomTabletServerClient()), + metadataUpdater.newRandomTabletServerClient(), + metadataUpdater), maxRetries, 0); // do a successful retry. @@ -855,7 +856,8 @@ final class SenderTest { return new IdempotenceManager( idempotenceEnabled, MAX_INFLIGHT_REQUEST_PER_BUCKET, - metadataUpdater.newRandomTabletServerClient()); + metadataUpdater.newRandomTabletServerClient(), + metadataUpdater); } private static boolean hasIdempotentRecords(TableBucket tb, ProduceLogRequest request) { diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java b/fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java index 0243a5bb6..31d8a9d27 100644 --- a/fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java @@ -19,6 +19,29 @@ package org.apache.fluss.cluster; /** The type of server in Fluss cluster. */ public enum ServerType { - COORDINATOR, - TABLET_SERVER + COORDINATOR(1), + TABLET_SERVER(2), + UNKNOWN(-1); + + private final int typeId; + + ServerType(int typeId) { + this.typeId = typeId; + } + + /** Get the ServerType from its typeId. */ + public static ServerType fromTypeId(int typeId) { + if (typeId == COORDINATOR.typeId) { + return COORDINATOR; + } else if (typeId == TABLET_SERVER.typeId) { + return TABLET_SERVER; + } else { + return UNKNOWN; + } + } + + /** Get the typeId of this ServerType. */ + public int toTypeId() { + return typeId; + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java b/fluss-common/src/main/java/org/apache/fluss/exception/InvalidServerTypeException.java similarity index 66% copy from fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java copy to fluss-common/src/main/java/org/apache/fluss/exception/InvalidServerTypeException.java index 0243a5bb6..4fa1f1f8a 100644 --- a/fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java +++ b/fluss-common/src/main/java/org/apache/fluss/exception/InvalidServerTypeException.java @@ -15,10 +15,16 @@ * limitations under the License. */ -package org.apache.fluss.cluster; +package org.apache.fluss.exception; -/** The type of server in Fluss cluster. */ -public enum ServerType { - COORDINATOR, - TABLET_SERVER +import org.apache.fluss.annotation.PublicEvolving; + +/** Exception thrown when a request is sent to a server of an invalid type. since: 0.9 */ +@PublicEvolving +public class InvalidServerTypeException extends InvalidMetadataException { + private static final long serialVersionUID = 1L; + + public InvalidServerTypeException(String message) { + super(message); + } } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java index 2b52d87f6..f2e62c68a 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java @@ -19,8 +19,10 @@ package org.apache.fluss.rpc.netty.client; import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.cluster.ServerNode; +import org.apache.fluss.cluster.ServerType; import org.apache.fluss.exception.DisconnectException; import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.InvalidServerTypeException; import org.apache.fluss.exception.NetworkException; import org.apache.fluss.exception.RetriableAuthenticationException; import org.apache.fluss.rpc.messages.ApiMessage; @@ -372,6 +374,27 @@ final class ServerConnection { return; } + ApiVersionsResponse apiVersion = (ApiVersionsResponse) response; + if (apiVersion.hasServerType()) { + ServerType serverType = ServerType.fromTypeId(apiVersion.getServerType()); + // bootstrap servers are set to unknown type, because they may coordinator or tablet. + if (node.serverType() != ServerType.UNKNOWN && serverType != node.serverType()) { + LOG.warn( + "Server type mismatch, expected: {}, actual: {}", + node.serverType(), + serverType); + close( + new InvalidServerTypeException( + "Server type mismatch, expected: " + + node.serverType() + + ", actual: " + + serverType + + ", node: " + + node)); + return; + } + } + synchronized (lock) { serverApiVersions = new ServerApiVersions(((ApiVersionsResponse) response).getApiVersionsList()); diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index 8ea29f9b7..66f09f0dc 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -40,6 +40,7 @@ message ApiVersionsRequest { message ApiVersionsResponse { repeated PbApiVersion api_versions = 1; + optional int32 server_type = 2; } // get schema request and response diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingGatewayService.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingGatewayService.java index db516f784..136ed2153 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingGatewayService.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingGatewayService.java @@ -54,6 +54,7 @@ public class TestingGatewayService extends RpcGatewayService { } ApiVersionsResponse response = new ApiVersionsResponse(); response.addAllApiVersions(apiVersions); + response.setServerType(providerType().toTypeId()); processorThreadNames.add(Thread.currentThread().getName()); return CompletableFuture.completedFuture(response); } diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java index 405e97114..7773ea5da 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java @@ -83,9 +83,10 @@ import java.util.concurrent.CompletableFuture; /** A testing implementation of the {@link TabletServerGateway} interface. */ public class TestingTabletGatewayService extends TestingGatewayService implements TabletServerGateway { + @Override public ServerType providerType() { - return null; + return ServerType.TABLET_SERVER; } @Override diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java index d9e2d9cdf..8ad62d94f 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java @@ -250,10 +250,10 @@ public class AuthenticationTest { // use client listener to connect to server mutualAuthServerNode = new ServerNode( - 1, "localhost", availablePort1.getPort(), ServerType.COORDINATOR); + 1, "localhost", availablePort1.getPort(), ServerType.TABLET_SERVER); usernamePasswordServerNode = new ServerNode( - 2, "localhost", availablePort2.getPort(), ServerType.COORDINATOR); + 2, "localhost", availablePort2.getPort(), ServerType.TABLET_SERVER); } } diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java index 25e00b9f9..a96a826aa 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java @@ -204,7 +204,7 @@ public class SaslAuthenticationITCase { // use client listener to connect to server ServerNode serverNode = new ServerNode( - 1, "localhost", availablePort1.getPort(), ServerType.COORDINATOR); + 1, "localhost", availablePort1.getPort(), ServerType.TABLET_SERVER); try (NettyClient nettyClient = new NettyClient(clientConfig, TestingClientMetricGroup.newInstance(), false)) { ListTablesRequest request = diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java index 554926ac4..41e4bef2c 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java @@ -22,6 +22,7 @@ import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.cluster.ServerType; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.DisconnectException; +import org.apache.fluss.exception.InvalidServerTypeException; import org.apache.fluss.metrics.Gauge; import org.apache.fluss.metrics.Metric; import org.apache.fluss.metrics.MetricType; @@ -31,7 +32,9 @@ import org.apache.fluss.metrics.registry.NOPMetricRegistry; import org.apache.fluss.metrics.util.NOPMetricsGroup; import org.apache.fluss.rpc.TestingGatewayService; import org.apache.fluss.rpc.TestingTabletGatewayService; +import org.apache.fluss.rpc.messages.ApiMessage; import org.apache.fluss.rpc.messages.GetTableSchemaRequest; +import org.apache.fluss.rpc.messages.ListDatabasesRequest; import org.apache.fluss.rpc.messages.LookupRequest; import org.apache.fluss.rpc.messages.PbLookupReqForBucket; import org.apache.fluss.rpc.messages.PbTablePath; @@ -44,6 +47,7 @@ import org.apache.fluss.rpc.protocol.ApiKeys; import org.apache.fluss.security.auth.AuthenticationFactory; import org.apache.fluss.security.auth.ClientAuthenticator; import org.apache.fluss.shaded.netty4.io.netty.bootstrap.Bootstrap; +import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelFuture; import org.apache.fluss.shaded.netty4.io.netty.channel.EventLoopGroup; import org.apache.fluss.utils.NetUtils; @@ -55,7 +59,9 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_IN_RATE_AVG; import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_IN_RATE_TOTAL; @@ -196,6 +202,55 @@ public class ServerConnectionTest { connection.close().get(); } + @Test + void testWrongServerType() { + ServerNode wrongServerTypeNode = + new ServerNode( + serverNode.id(), + serverNode.host(), + serverNode.port(), + ServerType.COORDINATOR); + CountDownLatch countDownLatch = new CountDownLatch(1); + Bootstrap mockBootstrap = + new Bootstrap() { + @Override + public ChannelFuture connect(String host, int port) { + return bootstrap + .connect(host, port) + .addListener(f -> countDownLatch.await(1, TimeUnit.MINUTES)); + } + }; + ServerConnection connection = + new ServerConnection( + mockBootstrap, + wrongServerTypeNode, + TestingClientMetricGroup.newInstance(), + clientAuthenticator, + (con, ignore) -> {}, + false); + + // Pending request will be rejected with InvalidServerTypeException which is + // InvalidRequestException. + CompletableFuture<ApiMessage> future = + connection.send(ApiKeys.LIST_DATABASES, new ListDatabasesRequest()); + assertThat(future).isNotDone(); + countDownLatch.countDown(); + assertThatThrownBy(future::get) + .rootCause() + .isInstanceOf(InvalidServerTypeException.class) + .hasMessageContaining("Server type mismatch"); + + // Later request will be rejected with DisconnectException which is also + // InvalidRequestException. + assertThatThrownBy( + () -> + connection + .send(ApiKeys.LIST_DATABASES, new ListDatabasesRequest()) + .get()) + .rootCause() + .isInstanceOf(DisconnectException.class); + } + private void buildNettyServer() throws Exception { try (NetUtils.Port availablePort = getAvailablePort(); NetUtils.Port availablePort2 = getAvailablePort()) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java index afb7e71b5..2946fb93b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java @@ -198,6 +198,7 @@ public abstract class RpcServiceBase extends RpcGatewayService implements AdminR } ApiVersionsResponse response = new ApiVersionsResponse(); response.addAllApiVersions(apiVersions); + response.setServerType(provider.toTypeId()); return CompletableFuture.completedFuture(response); }
