This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new fad97df6d [client] Thrown exception if ServerType mismatch. (#2606)
fad97df6d is described below
commit fad97df6d78d97c82dbe5b4014f8bbab4d9289d8
Author: Hongshun Wang <[email protected]>
AuthorDate: Wed Feb 11 16:32:18 2026 +0800
[client] Thrown exception if ServerType mismatch. (#2606)
---
.../fluss/client/metadata/MetadataUpdater.java | 5 +-
.../fluss/client/write/IdempotenceManager.java | 10 +++-
.../apache/fluss/client/write/WriterClient.java | 6 ++-
.../fluss/client/write/RecordAccumulatorTest.java | 3 +-
.../org/apache/fluss/client/write/SenderTest.java | 6 ++-
.../java/org/apache/fluss/cluster/ServerType.java | 27 ++++++++++-
.../InvalidServerTypeException.java} | 16 +++++--
.../fluss/rpc/netty/client/ServerConnection.java | 23 +++++++++
fluss-rpc/src/main/proto/FlussApi.proto | 1 +
.../apache/fluss/rpc/TestingGatewayService.java | 1 +
.../fluss/rpc/TestingTabletGatewayService.java | 3 +-
.../rpc/netty/authenticate/AuthenticationTest.java | 4 +-
.../authenticate/SaslAuthenticationITCase.java | 2 +-
.../rpc/netty/client/ServerConnectionTest.java | 55 ++++++++++++++++++++++
.../org/apache/fluss/server/RpcServiceBase.java | 1 +
15 files changed, 142 insertions(+), 21 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
index 5cb714044..e33a78344 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
@@ -307,10 +307,7 @@ public class MetadataUpdater {
try {
serverNode =
new ServerNode(
- -1,
- address.getHostString(),
- address.getPort(),
- ServerType.COORDINATOR);
+ -1, address.getHostString(),
address.getPort(), ServerType.UNKNOWN);
ServerNode finalServerNode = serverNode;
AdminReadOnlyGateway adminReadOnlyGateway =
GatewayClientProxy.createGatewayProxy(
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
index 5593c1999..7015b4e9a 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
@@ -19,7 +19,9 @@ package org.apache.fluss.client.write;
import org.apache.fluss.annotation.Internal;
import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.exception.AuthorizationException;
+import org.apache.fluss.exception.InvalidMetadataException;
import org.apache.fluss.exception.OutOfOrderSequenceException;
import org.apache.fluss.exception.UnknownWriterIdException;
import org.apache.fluss.metadata.PhysicalTablePath;
@@ -63,18 +65,21 @@ public class IdempotenceManager {
private final IdempotenceBucketMap idempotenceBucketMap;
private final int maxInflightRequestsPerBucket;
private final TabletServerGateway tabletServerGateway;
+ private final MetadataUpdater metadataUpdater;
private volatile long writerId;
public IdempotenceManager(
boolean idempotenceEnabled,
int maxInflightRequestsPerBucket,
- TabletServerGateway tabletServerGateway) {
+ TabletServerGateway tabletServerGateway,
+ MetadataUpdater metadataUpdater) {
this.idempotenceEnabled = idempotenceEnabled;
this.maxInflightRequestsPerBucket = maxInflightRequestsPerBucket;
this.idempotenceBucketMap = new IdempotenceBucketMap();
this.tabletServerGateway = tabletServerGateway;
this.writerId = NO_WRITER_ID;
+ this.metadataUpdater = metadataUpdater;
}
boolean idempotenceEnabled() {
@@ -307,6 +312,9 @@ public class IdempotenceManager {
if (t instanceof AuthorizationException || retryCount >=
RETRY_TIMES) {
throw t;
} else {
+ if (t instanceof InvalidMetadataException) {
+
metadataUpdater.updatePhysicalTableMetadata(tablePaths);
+ }
LOG.warn(
"Failed to init writer id, the retry count: {},
error message: {}",
retryCount,
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
index 788d385a1..0e184f09f 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
@@ -259,8 +259,10 @@ public class WriterClient {
TabletServerGateway tabletServerGateway =
metadataUpdater.newRandomTabletServerClient();
return idempotenceEnabled
- ? new IdempotenceManager(true, maxInflightRequestPerBucket,
tabletServerGateway)
- : new IdempotenceManager(false, maxInflightRequestPerBucket,
tabletServerGateway);
+ ? new IdempotenceManager(
+ true, maxInflightRequestPerBucket,
tabletServerGateway, metadataUpdater)
+ : new IdempotenceManager(
+ false, maxInflightRequestPerBucket,
tabletServerGateway, metadataUpdater);
}
private short configureAcks(boolean idempotenceEnabled) {
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
index d3f772f38..746d251d8 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
@@ -634,7 +634,8 @@ class RecordAccumulatorTest {
() -> cluster.getRandomTabletServer(),
RpcClient.create(
conf,
TestingClientMetricGroup.newInstance(), false),
- TabletServerGateway.class)),
+ TabletServerGateway.class),
+ null),
TestingWriterMetricGroup.newInstance(),
clock);
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
index c32b3f34e..16ba2f2ec 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
@@ -124,7 +124,8 @@ final class SenderTest {
new IdempotenceManager(
false,
MAX_INFLIGHT_REQUEST_PER_BUCKET,
- metadataUpdater.newRandomTabletServerClient()),
+ metadataUpdater.newRandomTabletServerClient(),
+ metadataUpdater),
maxRetries,
0);
// do a successful retry.
@@ -855,7 +856,8 @@ final class SenderTest {
return new IdempotenceManager(
idempotenceEnabled,
MAX_INFLIGHT_REQUEST_PER_BUCKET,
- metadataUpdater.newRandomTabletServerClient());
+ metadataUpdater.newRandomTabletServerClient(),
+ metadataUpdater);
}
private static boolean hasIdempotentRecords(TableBucket tb,
ProduceLogRequest request) {
diff --git
a/fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java
b/fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java
index 0243a5bb6..31d8a9d27 100644
--- a/fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java
+++ b/fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java
@@ -19,6 +19,29 @@ package org.apache.fluss.cluster;
/** The type of server in Fluss cluster. */
public enum ServerType {
- COORDINATOR,
- TABLET_SERVER
+ COORDINATOR(1),
+ TABLET_SERVER(2),
+ UNKNOWN(-1);
+
+ private final int typeId;
+
+ ServerType(int typeId) {
+ this.typeId = typeId;
+ }
+
+ /** Get the ServerType from its typeId. */
+ public static ServerType fromTypeId(int typeId) {
+ if (typeId == COORDINATOR.typeId) {
+ return COORDINATOR;
+ } else if (typeId == TABLET_SERVER.typeId) {
+ return TABLET_SERVER;
+ } else {
+ return UNKNOWN;
+ }
+ }
+
+ /** Get the typeId of this ServerType. */
+ public int toTypeId() {
+ return typeId;
+ }
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java
b/fluss-common/src/main/java/org/apache/fluss/exception/InvalidServerTypeException.java
similarity index 66%
copy from fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java
copy to
fluss-common/src/main/java/org/apache/fluss/exception/InvalidServerTypeException.java
index 0243a5bb6..4fa1f1f8a 100644
--- a/fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java
+++
b/fluss-common/src/main/java/org/apache/fluss/exception/InvalidServerTypeException.java
@@ -15,10 +15,16 @@
* limitations under the License.
*/
-package org.apache.fluss.cluster;
+package org.apache.fluss.exception;
-/** The type of server in Fluss cluster. */
-public enum ServerType {
- COORDINATOR,
- TABLET_SERVER
+import org.apache.fluss.annotation.PublicEvolving;
+
+/** Exception thrown when a request is sent to a server of an invalid type.
since: 0.9 */
+@PublicEvolving
+public class InvalidServerTypeException extends InvalidMetadataException {
+ private static final long serialVersionUID = 1L;
+
+ public InvalidServerTypeException(String message) {
+ super(message);
+ }
}
diff --git
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
index 2b52d87f6..f2e62c68a 100644
---
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
+++
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
@@ -19,8 +19,10 @@ package org.apache.fluss.rpc.netty.client;
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.cluster.ServerNode;
+import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.exception.DisconnectException;
import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.InvalidServerTypeException;
import org.apache.fluss.exception.NetworkException;
import org.apache.fluss.exception.RetriableAuthenticationException;
import org.apache.fluss.rpc.messages.ApiMessage;
@@ -372,6 +374,27 @@ final class ServerConnection {
return;
}
+ ApiVersionsResponse apiVersion = (ApiVersionsResponse) response;
+ if (apiVersion.hasServerType()) {
+ ServerType serverType =
ServerType.fromTypeId(apiVersion.getServerType());
+ // bootstrap servers are set to unknown type, because they may
coordinator or tablet.
+ if (node.serverType() != ServerType.UNKNOWN && serverType !=
node.serverType()) {
+ LOG.warn(
+ "Server type mismatch, expected: {}, actual: {}",
+ node.serverType(),
+ serverType);
+ close(
+ new InvalidServerTypeException(
+ "Server type mismatch, expected: "
+ + node.serverType()
+ + ", actual: "
+ + serverType
+ + ", node: "
+ + node));
+ return;
+ }
+ }
+
synchronized (lock) {
serverApiVersions =
new ServerApiVersions(((ApiVersionsResponse)
response).getApiVersionsList());
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto
b/fluss-rpc/src/main/proto/FlussApi.proto
index 8ea29f9b7..66f09f0dc 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -40,6 +40,7 @@ message ApiVersionsRequest {
message ApiVersionsResponse {
repeated PbApiVersion api_versions = 1;
+ optional int32 server_type = 2;
}
// get schema request and response
diff --git
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingGatewayService.java
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingGatewayService.java
index db516f784..136ed2153 100644
--- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingGatewayService.java
+++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingGatewayService.java
@@ -54,6 +54,7 @@ public class TestingGatewayService extends RpcGatewayService {
}
ApiVersionsResponse response = new ApiVersionsResponse();
response.addAllApiVersions(apiVersions);
+ response.setServerType(providerType().toTypeId());
processorThreadNames.add(Thread.currentThread().getName());
return CompletableFuture.completedFuture(response);
}
diff --git
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
index 405e97114..7773ea5da 100644
---
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
+++
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
@@ -83,9 +83,10 @@ import java.util.concurrent.CompletableFuture;
/** A testing implementation of the {@link TabletServerGateway} interface. */
public class TestingTabletGatewayService extends TestingGatewayService
implements TabletServerGateway {
+
@Override
public ServerType providerType() {
- return null;
+ return ServerType.TABLET_SERVER;
}
@Override
diff --git
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java
index d9e2d9cdf..8ad62d94f 100644
---
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java
+++
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java
@@ -250,10 +250,10 @@ public class AuthenticationTest {
// use client listener to connect to server
mutualAuthServerNode =
new ServerNode(
- 1, "localhost", availablePort1.getPort(),
ServerType.COORDINATOR);
+ 1, "localhost", availablePort1.getPort(),
ServerType.TABLET_SERVER);
usernamePasswordServerNode =
new ServerNode(
- 2, "localhost", availablePort2.getPort(),
ServerType.COORDINATOR);
+ 2, "localhost", availablePort2.getPort(),
ServerType.TABLET_SERVER);
}
}
diff --git
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java
index 25e00b9f9..a96a826aa 100644
---
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java
+++
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java
@@ -204,7 +204,7 @@ public class SaslAuthenticationITCase {
// use client listener to connect to server
ServerNode serverNode =
new ServerNode(
- 1, "localhost", availablePort1.getPort(),
ServerType.COORDINATOR);
+ 1, "localhost", availablePort1.getPort(),
ServerType.TABLET_SERVER);
try (NettyClient nettyClient =
new NettyClient(clientConfig,
TestingClientMetricGroup.newInstance(), false)) {
ListTablesRequest request =
diff --git
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
index 554926ac4..41e4bef2c 100644
---
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
+++
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
@@ -22,6 +22,7 @@ import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.DisconnectException;
+import org.apache.fluss.exception.InvalidServerTypeException;
import org.apache.fluss.metrics.Gauge;
import org.apache.fluss.metrics.Metric;
import org.apache.fluss.metrics.MetricType;
@@ -31,7 +32,9 @@ import org.apache.fluss.metrics.registry.NOPMetricRegistry;
import org.apache.fluss.metrics.util.NOPMetricsGroup;
import org.apache.fluss.rpc.TestingGatewayService;
import org.apache.fluss.rpc.TestingTabletGatewayService;
+import org.apache.fluss.rpc.messages.ApiMessage;
import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
+import org.apache.fluss.rpc.messages.ListDatabasesRequest;
import org.apache.fluss.rpc.messages.LookupRequest;
import org.apache.fluss.rpc.messages.PbLookupReqForBucket;
import org.apache.fluss.rpc.messages.PbTablePath;
@@ -44,6 +47,7 @@ import org.apache.fluss.rpc.protocol.ApiKeys;
import org.apache.fluss.security.auth.AuthenticationFactory;
import org.apache.fluss.security.auth.ClientAuthenticator;
import org.apache.fluss.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.fluss.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.fluss.utils.NetUtils;
@@ -55,7 +59,9 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_IN_RATE_AVG;
import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_IN_RATE_TOTAL;
@@ -196,6 +202,55 @@ public class ServerConnectionTest {
connection.close().get();
}
+ @Test
+ void testWrongServerType() {
+ ServerNode wrongServerTypeNode =
+ new ServerNode(
+ serverNode.id(),
+ serverNode.host(),
+ serverNode.port(),
+ ServerType.COORDINATOR);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ Bootstrap mockBootstrap =
+ new Bootstrap() {
+ @Override
+ public ChannelFuture connect(String host, int port) {
+ return bootstrap
+ .connect(host, port)
+ .addListener(f -> countDownLatch.await(1,
TimeUnit.MINUTES));
+ }
+ };
+ ServerConnection connection =
+ new ServerConnection(
+ mockBootstrap,
+ wrongServerTypeNode,
+ TestingClientMetricGroup.newInstance(),
+ clientAuthenticator,
+ (con, ignore) -> {},
+ false);
+
+ // Pending request will be rejected with InvalidServerTypeException
which is
+ // InvalidRequestException.
+ CompletableFuture<ApiMessage> future =
+ connection.send(ApiKeys.LIST_DATABASES, new
ListDatabasesRequest());
+ assertThat(future).isNotDone();
+ countDownLatch.countDown();
+ assertThatThrownBy(future::get)
+ .rootCause()
+ .isInstanceOf(InvalidServerTypeException.class)
+ .hasMessageContaining("Server type mismatch");
+
+ // Later request will be rejected with DisconnectException which is
also
+ // InvalidRequestException.
+ assertThatThrownBy(
+ () ->
+ connection
+ .send(ApiKeys.LIST_DATABASES, new
ListDatabasesRequest())
+ .get())
+ .rootCause()
+ .isInstanceOf(DisconnectException.class);
+ }
+
private void buildNettyServer() throws Exception {
try (NetUtils.Port availablePort = getAvailablePort();
NetUtils.Port availablePort2 = getAvailablePort()) {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
index afb7e71b5..2946fb93b 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
@@ -198,6 +198,7 @@ public abstract class RpcServiceBase extends
RpcGatewayService implements AdminR
}
ApiVersionsResponse response = new ApiVersionsResponse();
response.addAllApiVersions(apiVersions);
+ response.setServerType(provider.toTypeId());
return CompletableFuture.completedFuture(response);
}