LadyForest commented on code in PR #21776:
URL: https://github.com/apache/flink/pull/21776#discussion_r1089971885
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/util/GetApiVersionHandler.java:
##########
@@ -40,12 +42,31 @@
extends AbstractSqlGatewayRestHandler<
EmptyRequestBody, GetApiVersionResponseBody,
EmptyMessageParameters> {
+ private final List<SqlGatewayRestAPIVersion> supportedVersions;
+
public GetApiVersionHandler(
SqlGatewayService service,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, GetApiVersionResponseBody,
EmptyMessageParameters>
messageHeaders) {
+ this(
+ service,
+ responseHeaders,
+ messageHeaders,
+ Arrays.stream(SqlGatewayRestAPIVersion.values())
+ .filter(SqlGatewayRestAPIVersion::isStableVersion)
+ .collect(Collectors.toList()));
Review Comment:
Nit: encapsulate the logic within `SqlGatewayRestAPIVersion` like
`getStableVersions` or `getSupportedVersions`?
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java:
##########
@@ -429,4 +453,46 @@ private CompletableFuture<OperationStatusResponseBody>
closeOperationAsync(
private OperationHandle getOperationHandle(Supplier<String>
handleSupplier) {
return new OperationHandle(UUID.fromString(handleSupplier.get()));
}
+
+ private SqlGatewayRestAPIVersion negotiateVersion() {
+ List<SqlGatewayRestAPIVersion> gatewayVersions =
+ getResponse(
+ sendRequest(
+ GetApiVersionHeaders.getInstance(),
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance(),
+ // Currently, RestClient always uses
the latest REST API
+ // version to build the targetUrl.
However, it's possible
+ // that the client REST API version is
higher than the
+ // server REST API version. In this
case, the gateway will
+ // report Not Found Error to notify
the client.
+ //
+ // So, here use the lowest REST API
version to get the
+ // remote gateway version list and
then determine the
+ // connection version.
+ // TODO: Remove this after the REST
Client should allow
+ // to build the target URL without API
version.
+ Collections.min(
+
Arrays.stream(SqlGatewayRestAPIVersion.values())
+ .filter(
+
SqlGatewayRestAPIVersion
+
::isStableVersion)
+
.collect(Collectors.toList()))))
+ .getVersions().stream()
+ .map(SqlGatewayRestAPIVersion::valueOf)
+ .collect(Collectors.toList());
Review Comment:
I suppose this might be encapsulated in the `SqlGatewayRestAPIVersion` like
`getInitialStableVersion`
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java:
##########
@@ -429,4 +453,46 @@ private CompletableFuture<OperationStatusResponseBody>
closeOperationAsync(
private OperationHandle getOperationHandle(Supplier<String>
handleSupplier) {
return new OperationHandle(UUID.fromString(handleSupplier.get()));
}
+
+ private SqlGatewayRestAPIVersion negotiateVersion() {
+ List<SqlGatewayRestAPIVersion> gatewayVersions =
+ getResponse(
+ sendRequest(
+ GetApiVersionHeaders.getInstance(),
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance(),
+ // Currently, RestClient always uses
the latest REST API
+ // version to build the targetUrl.
However, it's possible
+ // that the client REST API version is
higher than the
+ // server REST API version. In this
case, the gateway will
+ // report Not Found Error to notify
the client.
+ //
+ // So, here use the lowest REST API
version to get the
+ // remote gateway version list and
then determine the
+ // connection version.
+ // TODO: Remove this after the REST
Client should allow
+ // to build the target URL without API
version.
+ Collections.min(
+
Arrays.stream(SqlGatewayRestAPIVersion.values())
+ .filter(
+
SqlGatewayRestAPIVersion
+
::isStableVersion)
+
.collect(Collectors.toList()))))
+ .getVersions().stream()
+ .map(SqlGatewayRestAPIVersion::valueOf)
+ .collect(Collectors.toList());
+ SqlGatewayRestAPIVersion clientVersion =
SqlGatewayRestAPIVersion.getDefaultVersion();
+
+ if (gatewayVersions.contains(clientVersion)) {
+ return clientVersion;
+ } else {
+ SqlGatewayRestAPIVersion latestVersion =
+ RestAPIVersion.getLatestVersion(gatewayVersions);
+ if (latestVersion.equals(SqlGatewayRestAPIVersion.V1)) {
+ throw new SqlExecutionException(
+ "Currently SQL Client only supports to connect to the
REST endpoint whose API version is larger than V1.");
Review Comment:
Nit: suggested by Grammarly
`Currently SQL Client only supports to connect to the REST endpoint whose
API version is larger than V1.`
=>
`Currently, SQL Client only supports connecting to the REST endpoint with
API version larger than V1.`
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java:
##########
@@ -339,13 +347,29 @@ private FetchResultsResponseBody
fetchResults(OperationHandle operationHandle, l
R extends RequestBody,
P extends ResponseBody>
CompletableFuture<P> sendRequest(M messageHeaders, U
messageParameters, R request) {
+ Preconditions.checkNotNull(connectionVersion);
Review Comment:
Nit: add an error msg?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]